// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controller
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/db/pagination"
"github.com/openimsdk/tools/db/tx"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/stringutil"
)
type ConversationDatabase interface {
// UpdateUsersConversationField updates the properties of a conversation for specified users.
UpdateUsersConversationField ( ctx context . Context , userIDs [ ] string , conversationID string , args map [ string ] any ) error
// CreateConversation creates a batch of new conversations.
CreateConversation ( ctx context . Context , conversations [ ] * relationtb . Conversation ) error
// SyncPeerUserPrivateConversationTx ensures transactional operation while syncing private conversations between peers.
SyncPeerUserPrivateConversationTx ( ctx context . Context , conversation [ ] * relationtb . Conversation ) error
// FindConversations retrieves multiple conversations of a user by conversation IDs.
FindConversations ( ctx context . Context , ownerUserID string , conversationIDs [ ] string ) ( [ ] * relationtb . Conversation , error )
// GetUserAllConversation fetches all conversations of a user on the server.
GetUserAllConversation ( ctx context . Context , ownerUserID string ) ( [ ] * relationtb . Conversation , error )
// SetUserConversations sets multiple conversation properties for a user, creates new conversations if they do not exist, or updates them otherwise. This operation is atomic.
SetUserConversations ( ctx context . Context , ownerUserID string , conversations [ ] * relationtb . Conversation ) error
// SetUsersConversationFieldTx updates a specific field for multiple users' conversations, creating new conversations if they do not exist, or updates them otherwise. This operation is
// transactional.
SetUsersConversationFieldTx ( ctx context . Context , userIDs [ ] string , conversation * relationtb . Conversation , fieldMap map [ string ] any ) error
// CreateGroupChatConversation creates a group chat conversation for the specified group ID and user IDs.
CreateGroupChatConversation ( ctx context . Context , groupID string , userIDs [ ] string ) error
// GetConversationIDs retrieves conversation IDs for a given user.
GetConversationIDs ( ctx context . Context , userID string ) ( [ ] string , error )
// GetUserConversationIDsHash gets the hash of conversation IDs for a given user.
GetUserConversationIDsHash ( ctx context . Context , ownerUserID string ) ( hash uint64 , err error )
// GetAllConversationIDs fetches all conversation IDs.
GetAllConversationIDs ( ctx context . Context ) ( [ ] string , error )
// GetAllConversationIDsNumber returns the number of all conversation IDs.
GetAllConversationIDsNumber ( ctx context . Context ) ( int64 , error )
// PageConversationIDs paginates through conversation IDs based on the specified pagination settings.
PageConversationIDs ( ctx context . Context , pagination pagination . Pagination ) ( conversationIDs [ ] string , err error )
// GetConversationsByConversationID retrieves conversations by their IDs.
GetConversationsByConversationID ( ctx context . Context , conversationIDs [ ] string ) ( [ ] * relationtb . Conversation , error )
// GetConversationIDsNeedDestruct fetches conversations that need to be destructed based on specific criteria.
GetConversationIDsNeedDestruct ( ctx context . Context ) ( [ ] * relationtb . Conversation , error )
// GetConversationNotReceiveMessageUserIDs gets user IDs for users in a conversation who have not received messages.
GetConversationNotReceiveMessageUserIDs ( ctx context . Context , conversationID string ) ( [ ] string , error )
// GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
// FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
FindConversationUserVersion ( ctx context . Context , userID string , version uint , limit int ) ( * relationtb . VersionLog , error )
FindMaxConversationUserVersionCache ( ctx context . Context , userID string ) ( * relationtb . VersionLog , error )
GetOwnerConversation ( ctx context . Context , ownerUserID string , pagination pagination . Pagination ) ( int64 , [ ] * relationtb . Conversation , error )
}
func NewConversationDatabase ( conversation database . Conversation , cache cache . ConversationCache , tx tx . Tx ) ConversationDatabase {
return & conversationDatabase {
conversationDB : conversation ,
cache : cache ,
tx : tx ,
}
}
type conversationDatabase struct {
conversationDB database . Conversation
cache cache . ConversationCache
tx tx . Tx
}
func ( c * conversationDatabase ) SetUsersConversationFieldTx ( ctx context . Context , userIDs [ ] string , conversation * relationtb . Conversation , fieldMap map [ string ] any ) ( err error ) {
return c . tx . Transaction ( ctx , func ( ctx context . Context ) error {
cache := c . cache . CloneConversationCache ( )
if conversation . GroupID != "" {
cache = cache . DelSuperGroupRecvMsgNotNotifyUserIDs ( conversation . GroupID ) . DelSuperGroupRecvMsgNotNotifyUserIDsHash ( conversation . GroupID )
}
haveUserIDs , err := c . conversationDB . FindUserID ( ctx , userIDs , [ ] string { conversation . ConversationID } )
if err != nil {
return err
}
if len ( haveUserIDs ) > 0 {
_ , err = c . conversationDB . UpdateByMap ( ctx , haveUserIDs , conversation . ConversationID , fieldMap )
if err != nil {
return err
}
cache = cache . DelUsersConversation ( conversation . ConversationID , haveUserIDs ... )
if _ , ok := fieldMap [ "has_read_seq" ] ; ok {
for _ , userID := range haveUserIDs {
cache = cache . DelUserAllHasReadSeqs ( userID , conversation . ConversationID )
}
}
if _ , ok := fieldMap [ "recv_msg_opt" ] ; ok {
cache = cache . DelConversationNotReceiveMessageUserIDs ( conversation . ConversationID )
}
cache = cache . DelConversationVersionUserIDs ( haveUserIDs ... )
}
NotUserIDs := stringutil . DifferenceString ( haveUserIDs , userIDs )
log . ZDebug ( ctx , "SetUsersConversationFieldTx" , "NotUserIDs" , NotUserIDs , "haveUserIDs" , haveUserIDs , "userIDs" , userIDs )
var conversations [ ] * relationtb . Conversation
now := time . Now ( )
for _ , v := range NotUserIDs {
temp := new ( relationtb . Conversation )
if err = datautil . CopyStructFields ( temp , conversation ) ; err != nil {
return err
}
temp . OwnerUserID = v
temp . CreateTime = now
conversations = append ( conversations , temp )
}
if len ( conversations ) > 0 {
err = c . conversationDB . Create ( ctx , conversations )
if err != nil {
return err
}
cache = cache . DelConversationIDs ( NotUserIDs ... ) . DelUserConversationIDsHash ( NotUserIDs ... ) . DelConversations ( conversation . ConversationID , NotUserIDs ... )
}
return cache . ChainExecDel ( ctx )
} )
}
func ( c * conversationDatabase ) UpdateUsersConversationField ( ctx context . Context , userIDs [ ] string , conversationID string , args map [ string ] any ) error {
_ , err := c . conversationDB . UpdateByMap ( ctx , userIDs , conversationID , args )
if err != nil {
return err
}
cache := c . cache . CloneConversationCache ( )
cache = cache . DelUsersConversation ( conversationID , userIDs ... ) . DelConversationVersionUserIDs ( userIDs ... )
if _ , ok := args [ "recv_msg_opt" ] ; ok {
cache = cache . DelConversationNotReceiveMessageUserIDs ( conversationID )
}
return cache . ChainExecDel ( ctx )
}
func ( c * conversationDatabase ) CreateConversation ( ctx context . Context , conversations [ ] * relationtb . Conversation ) error {
if err := c . conversationDB . Create ( ctx , conversations ) ; err != nil {
return err
}
var userIDs [ ] string
cache := c . cache . CloneConversationCache ( )
for _ , conversation := range conversations {
cache = cache . DelConversations ( conversation . OwnerUserID , conversation . ConversationID )
cache = cache . DelConversationNotReceiveMessageUserIDs ( conversation . ConversationID )
userIDs = append ( userIDs , conversation . OwnerUserID )
}
return cache . DelConversationIDs ( userIDs ... ) . DelUserConversationIDsHash ( userIDs ... ) . DelConversationVersionUserIDs ( userIDs ... ) . ChainExecDel ( ctx )
}
func ( c * conversationDatabase ) SyncPeerUserPrivateConversationTx ( ctx context . Context , conversations [ ] * relationtb . Conversation ) error {
return c . tx . Transaction ( ctx , func ( ctx context . Context ) error {
cache := c . cache . CloneConversationCache ( )
for _ , conversation := range conversations {
cache = cache . DelConversationVersionUserIDs ( conversation . OwnerUserID )
for _ , v := range [ ] [ 2 ] string { { conversation . OwnerUserID , conversation . UserID } , { conversation . UserID , conversation . OwnerUserID } } {
ownerUserID := v [ 0 ]
userID := v [ 1 ]
haveUserIDs , err := c . conversationDB . FindUserID ( ctx , [ ] string { ownerUserID } , [ ] string { conversation . ConversationID } )
if err != nil {
return err
}
if len ( haveUserIDs ) > 0 {
_ , err := c . conversationDB . UpdateByMap ( ctx , [ ] string { ownerUserID } , conversation . ConversationID , map [ string ] any { "is_private_chat" : conversation . IsPrivateChat } )
if err != nil {
return err
}
cache = cache . DelUsersConversation ( conversation . ConversationID , ownerUserID )
} else {
newConversation := * conversation
newConversation . OwnerUserID = ownerUserID
newConversation . UserID = userID
newConversation . ConversationID = conversation . ConversationID
newConversation . IsPrivateChat = conversation . IsPrivateChat
if err := c . conversationDB . Create ( ctx , [ ] * relationtb . Conversation { & newConversation } ) ; err != nil {
return err
}
cache = cache . DelConversationIDs ( ownerUserID ) . DelUserConversationIDsHash ( ownerUserID )
}
}
}
return cache . ChainExecDel ( ctx )
} )
}
func ( c * conversationDatabase ) FindConversations ( ctx context . Context , ownerUserID string , conversationIDs [ ] string ) ( [ ] * relationtb . Conversation , error ) {
return c . cache . GetConversations ( ctx , ownerUserID , conversationIDs )
}
func ( c * conversationDatabase ) GetConversation ( ctx context . Context , ownerUserID string , conversationID string ) ( * relationtb . Conversation , error ) {
return c . cache . GetConversation ( ctx , ownerUserID , conversationID )
}
func ( c * conversationDatabase ) GetUserAllConversation ( ctx context . Context , ownerUserID string ) ( [ ] * relationtb . Conversation , error ) {
return c . cache . GetUserAllConversations ( ctx , ownerUserID )
}
func ( c * conversationDatabase ) SetUserConversations ( ctx context . Context , ownerUserID string , conversations [ ] * relationtb . Conversation ) error {
return c . tx . Transaction ( ctx , func ( ctx context . Context ) error {
cache := c . cache . CloneConversationCache ( )
cache = cache . DelConversationVersionUserIDs ( ownerUserID )
groupIDs := datautil . Distinct ( datautil . Filter ( conversations , func ( e * relationtb . Conversation ) ( string , bool ) {
return e . GroupID , e . GroupID != ""
} ) )
for _ , groupID := range groupIDs {
cache = cache . DelSuperGroupRecvMsgNotNotifyUserIDs ( groupID ) . DelSuperGroupRecvMsgNotNotifyUserIDsHash ( groupID )
}
var conversationIDs [ ] string
for _ , conversation := range conversations {
conversationIDs = append ( conversationIDs , conversation . ConversationID )
cache = cache . DelConversations ( conversation . OwnerUserID , conversation . ConversationID )
}
existConversations , err := c . conversationDB . Find ( ctx , ownerUserID , conversationIDs )
if err != nil {
return err
}
if len ( existConversations ) > 0 {
for _ , conversation := range conversations {
err = c . conversationDB . Update ( ctx , conversation )
if err != nil {
return err
}
}
}
var existConversationIDs [ ] string
for _ , conversation := range existConversations {
existConversationIDs = append ( existConversationIDs , conversation . ConversationID )
}
var notExistConversations [ ] * relationtb . Conversation
for _ , conversation := range conversations {
if ! datautil . Contain ( conversation . ConversationID , existConversationIDs ... ) {
notExistConversations = append ( notExistConversations , conversation )
}
}
if len ( notExistConversations ) > 0 {
err = c . conversationDB . Create ( ctx , notExistConversations )
if err != nil {
return err
}
cache = cache . DelConversationIDs ( ownerUserID ) .
DelUserConversationIDsHash ( ownerUserID ) .
DelConversationNotReceiveMessageUserIDs ( datautil . Slice ( notExistConversations , func ( e * relationtb . Conversation ) string { return e . ConversationID } ) ... )
}
return cache . ChainExecDel ( ctx )
} )
}
// func (c *conversationDatabase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) {
// return c.cache.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
//}
func ( c * conversationDatabase ) CreateGroupChatConversation ( ctx context . Context , groupID string , userIDs [ ] string ) error {
return c . tx . Transaction ( ctx , func ( ctx context . Context ) error {
cache := c . cache . CloneConversationCache ( )
conversationID := msgprocessor . GetConversationIDBySessionType ( constant . ReadGroupChatType , groupID )
existConversationUserIDs , err := c . conversationDB . FindUserID ( ctx , userIDs , [ ] string { conversationID } )
if err != nil {
return err
}
notExistUserIDs := stringutil . DifferenceString ( userIDs , existConversationUserIDs )
var conversations [ ] * relationtb . Conversation
for _ , v := range notExistUserIDs {
conversation := relationtb . Conversation { ConversationType : constant . ReadGroupChatType , GroupID : groupID , OwnerUserID : v , ConversationID : conversationID }
conversations = append ( conversations , & conversation )
cache = cache . DelConversations ( v , conversationID ) . DelConversationNotReceiveMessageUserIDs ( conversationID )
}
cache = cache . DelConversationIDs ( notExistUserIDs ... ) . DelUserConversationIDsHash ( notExistUserIDs ... )
if len ( conversations ) > 0 {
err = c . conversationDB . Create ( ctx , conversations )
if err != nil {
return err
}
}
_ , err = c . conversationDB . UpdateByMap ( ctx , existConversationUserIDs , conversationID , map [ string ] any { "max_seq" : 0 } )
if err != nil {
return err
}
for _ , v := range existConversationUserIDs {
cache = cache . DelConversations ( v , conversationID )
}
return cache . ChainExecDel ( ctx )
} )
}
func ( c * conversationDatabase ) GetConversationIDs ( ctx context . Context , userID string ) ( [ ] string , error ) {
return c . cache . GetUserConversationIDs ( ctx , userID )
}
func ( c * conversationDatabase ) GetUserConversationIDsHash ( ctx context . Context , ownerUserID string ) ( hash uint64 , err error ) {
return c . cache . GetUserConversationIDsHash ( ctx , ownerUserID )
}
func ( c * conversationDatabase ) GetAllConversationIDs ( ctx context . Context ) ( [ ] string , error ) {
return c . conversationDB . GetAllConversationIDs ( ctx )
}
func ( c * conversationDatabase ) GetAllConversationIDsNumber ( ctx context . Context ) ( int64 , error ) {
return c . conversationDB . GetAllConversationIDsNumber ( ctx )
}
func ( c * conversationDatabase ) PageConversationIDs ( ctx context . Context , pagination pagination . Pagination ) ( [ ] string , error ) {
return c . conversationDB . PageConversationIDs ( ctx , pagination )
}
func ( c * conversationDatabase ) GetConversationsByConversationID ( ctx context . Context , conversationIDs [ ] string ) ( [ ] * relationtb . Conversation , error ) {
return c . conversationDB . GetConversationsByConversationID ( ctx , conversationIDs )
}
func ( c * conversationDatabase ) GetConversationIDsNeedDestruct ( ctx context . Context ) ( [ ] * relationtb . Conversation , error ) {
return c . conversationDB . GetConversationIDsNeedDestruct ( ctx )
}
func ( c * conversationDatabase ) GetConversationNotReceiveMessageUserIDs ( ctx context . Context , conversationID string ) ( [ ] string , error ) {
return c . cache . GetConversationNotReceiveMessageUserIDs ( ctx , conversationID )
}
func ( c * conversationDatabase ) FindConversationUserVersion ( ctx context . Context , userID string , version uint , limit int ) ( * relationtb . VersionLog , error ) {
return c . conversationDB . FindConversationUserVersion ( ctx , userID , version , limit )
}
func ( c * conversationDatabase ) FindMaxConversationUserVersionCache ( ctx context . Context , userID string ) ( * relationtb . VersionLog , error ) {
return c . cache . FindMaxConversationUserVersion ( ctx , userID )
}
func ( c * conversationDatabase ) GetOwnerConversation ( ctx context . Context , ownerUserID string , pagination pagination . Pagination ) ( int64 , [ ] * relationtb . Conversation , error ) {
conversationIDs , err := c . cache . GetUserConversationIDs ( ctx , ownerUserID )
if err != nil {
return 0 , nil , err
}
findConversationIDs := datautil . Paginate ( conversationIDs , int ( pagination . GetPageNumber ( ) ) , int ( pagination . GetShowNumber ( ) ) )
conversations := make ( [ ] * relationtb . Conversation , 0 , len ( findConversationIDs ) )
for _ , conversationID := range findConversationIDs {
conversation , err := c . cache . GetConversation ( ctx , ownerUserID , conversationID )
if err != nil {
return 0 , nil , err
}
conversations = append ( conversations , conversation )
}
return int64 ( len ( conversationIDs ) ) , conversations , nil
}