// Copyright © 2023 OpenIM. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package controller import ( "context" "time" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" ) type ConversationDatabase interface { // UpdateUserConversationFiled 更新用户该会话的属性信息 UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error // CreateConversation 创建一批新的会话 CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error // SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作 SyncPeerUserPrivateConversationTx(ctx context.Context, conversation []*relationTb.ConversationModel) error // FindConversations 根据会话ID获取某个用户的多个会话 FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) // FindRecvMsgNotNotifyUserIDs 获取超级大群开启免打扰的用户ID FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) // GetUserAllConversation 获取一个用户在服务器上所有的会话 GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) // SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性 SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error // SetUsersConversationFiledTx 设置多个用户会话关于某个字段的更新操作,如果会话不存在则创建,否则更新,内部保证事务操作 SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error GetConversationIDs(ctx context.Context, userID string) ([]string, error) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) GetAllConversationIDs(ctx context.Context) ([]string, error) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationTb.ConversationModel, error) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationTb.ConversationModel, error) } func NewConversationDatabase(conversation relationTb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { return &conversationDatabase{ conversationDB: conversation, cache: cache, tx: tx, } } type conversationDatabase struct { conversationDB relationTb.ConversationModelInterface cache cache.ConversationCache tx tx.Tx } func (c *conversationDatabase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) (err error) { cache := c.cache.NewCache() if err := c.tx.Transaction(func(tx any) error { conversationTx := c.conversationDB.NewTx(tx) haveUserIDs, err := conversationTx.FindUserID(ctx, userIDs, []string{conversation.ConversationID}) if err != nil { return err } if len(haveUserIDs) > 0 { _, err = conversationTx.UpdateByMap(ctx, haveUserIDs, conversation.ConversationID, filedMap) if err != nil { return err } cache = cache.DelUsersConversation(conversation.ConversationID, haveUserIDs...) if _, ok := filedMap["has_read_seq"]; ok { for _, userID := range haveUserIDs { cache = cache.DelUserAllHasReadSeqs(userID, conversation.ConversationID) } } } NotUserIDs := utils.DifferenceString(haveUserIDs, userIDs) log.ZDebug(ctx, "SetUsersConversationFiledTx", "NotUserIDs", NotUserIDs, "haveUserIDs", haveUserIDs, "userIDs", userIDs) var conversations []*relationTb.ConversationModel now := time.Now() for _, v := range NotUserIDs { temp := new(relationTb.ConversationModel) if err := utils.CopyStructFields(temp, conversation); err != nil { return err } temp.OwnerUserID = v temp.CreateTime = now conversations = append(conversations, temp) } if len(conversations) > 0 { err = conversationTx.Create(ctx, conversations) if err != nil { return err } cache = cache.DelConversationIDs(NotUserIDs...).DelUserConversationIDsHash(NotUserIDs...).DelConversations(conversation.ConversationID, NotUserIDs...) } return nil }); err != nil { return err } return cache.ExecDel(ctx) } func (c *conversationDatabase) UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error { _, err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args) if err != nil { return err } return c.cache.DelUsersConversation(conversationID, userIDs...).ExecDel(ctx) } func (c *conversationDatabase) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error { if err := c.conversationDB.Create(ctx, conversations); err != nil { return err } var userIDs []string cache := c.cache.NewCache() for _, conversation := range conversations { cache = cache.DelConversations(conversation.OwnerUserID, conversation.ConversationID) userIDs = append(userIDs, conversation.OwnerUserID) } return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).ExecDel(ctx) } func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationTb.ConversationModel) error { cache := c.cache.NewCache() if err := c.tx.Transaction(func(tx any) error { conversationTx := c.conversationDB.NewTx(tx) for _, conversation := range conversations { for _, v := range [][2]string{{conversation.OwnerUserID, conversation.UserID}, {conversation.UserID, conversation.OwnerUserID}} { ownerUserID := v[0] userID := v[1] haveUserIDs, err := conversationTx.FindUserID(ctx, []string{ownerUserID}, []string{conversation.ConversationID}) if err != nil { return err } if len(haveUserIDs) > 0 { _, err := conversationTx.UpdateByMap(ctx, []string{ownerUserID}, conversation.ConversationID, map[string]interface{}{"is_private_chat": conversation.IsPrivateChat}) if err != nil { return err } cache = cache.DelUsersConversation(conversation.ConversationID, ownerUserID) } else { newConversation := *conversation newConversation.OwnerUserID = ownerUserID newConversation.UserID = userID newConversation.ConversationID = conversation.ConversationID newConversation.IsPrivateChat = conversation.IsPrivateChat if err := conversationTx.Create(ctx, []*relationTb.ConversationModel{&newConversation}); err != nil { return err } cache = cache.DelConversationIDs(ownerUserID).DelUserConversationIDsHash(ownerUserID) } } } return nil }); err != nil { return err } return cache.ExecDel(ctx) } func (c *conversationDatabase) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) { return c.cache.GetConversations(ctx, ownerUserID, conversationIDs) } func (c *conversationDatabase) GetConversation(ctx context.Context, ownerUserID string, conversationID string) (*relationTb.ConversationModel, error) { return c.cache.GetConversation(ctx, ownerUserID, conversationID) } func (c *conversationDatabase) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) { return c.cache.GetUserAllConversations(ctx, ownerUserID) } func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error { cache := c.cache.NewCache() if err := c.tx.Transaction(func(tx any) error { var conversationIDs []string for _, conversation := range conversations { conversationIDs = append(conversationIDs, conversation.ConversationID) cache = cache.DelConversations(conversation.OwnerUserID, conversation.ConversationID) } conversationTx := c.conversationDB.NewTx(tx) existConversations, err := conversationTx.Find(ctx, ownerUserID, conversationIDs) if err != nil { return err } if len(existConversations) > 0 { for _, conversation := range conversations { err = conversationTx.Update(ctx, conversation) if err != nil { return err } } } var existConversationIDs []string for _, conversation := range existConversations { existConversationIDs = append(existConversationIDs, conversation.ConversationID) } var notExistConversations []*relationTb.ConversationModel for _, conversation := range conversations { if !utils.IsContain(conversation.ConversationID, existConversationIDs) { notExistConversations = append(notExistConversations, conversation) } } if len(notExistConversations) > 0 { err = c.conversationDB.Create(ctx, notExistConversations) if err != nil { return err } cache = cache.DelConversationIDs(ownerUserID).DelUserConversationIDsHash(ownerUserID) } return nil }); err != nil { return err } return cache.ExecDel(ctx) } func (c *conversationDatabase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) { return c.cache.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) } func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error { cache := c.cache.NewCache() conversationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) if err := c.tx.Transaction(func(tx any) error { existConversationUserIDs, err := c.conversationDB.FindUserID(ctx, userIDs, []string{conversationID}) if err != nil { return err } notExistUserIDs := utils.DifferenceString(userIDs, existConversationUserIDs) var conversations []*relationTb.ConversationModel for _, v := range notExistUserIDs { conversation := relationTb.ConversationModel{ConversationType: constant.SuperGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID} conversations = append(conversations, &conversation) cache = cache.DelConversations(v, conversationID) } cache = cache.DelConversationIDs(notExistUserIDs...).DelUserConversationIDsHash(notExistUserIDs...) if len(conversations) > 0 { err = c.conversationDB.Create(ctx, conversations) if err != nil { return err } } _, err = c.conversationDB.UpdateByMap(ctx, existConversationUserIDs, conversationID, map[string]interface{}{"max_seq": 0}) if err != nil { return err } for _, v := range existConversationUserIDs { cache = cache.DelConversations(v, conversationID) } return nil }); err != nil { return err } return cache.ExecDel(ctx) } func (c *conversationDatabase) GetConversationIDs(ctx context.Context, userID string) ([]string, error) { return c.cache.GetUserConversationIDs(ctx, userID) } func (c *conversationDatabase) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) { return c.cache.GetUserConversationIDsHash(ctx, ownerUserID) } func (c *conversationDatabase) GetAllConversationIDs(ctx context.Context) ([]string, error) { return c.conversationDB.GetAllConversationIDs(ctx) } func (c *conversationDatabase) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) { return c.cache.GetUserAllHasReadSeqs(ctx, ownerUserID) } func (c *conversationDatabase) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationTb.ConversationModel, error) { return c.conversationDB.GetConversationsByConversationID(ctx, conversationIDs) } func (c *conversationDatabase) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationTb.ConversationModel, error) { return c.conversationDB.GetConversationIDsNeedDestruct(ctx) }