conversation update

test-errcode
Gordon 2 years ago
parent f985689a39
commit a3593379f6

@ -7,7 +7,7 @@ import (
"Open_IM/pkg/common/db/cache" "Open_IM/pkg/common/db/cache"
"Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/db/controller"
"Open_IM/pkg/common/db/relation" "Open_IM/pkg/common/db/relation"
"Open_IM/pkg/common/db/table" tableRelation "Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/db/unrelation" "Open_IM/pkg/common/db/unrelation"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus" promePkg "Open_IM/pkg/common/prometheus"
@ -50,7 +50,7 @@ func NewConversationServer(port int) *conversationServer {
var cCache cache.ConversationCache var cCache cache.ConversationCache
//mysql init //mysql init
var mysql relation.Mysql var mysql relation.Mysql
err := mysql.InitConn().AutoMigrateModel(&table.ConversationModel{}) err := mysql.InitConn().AutoMigrateModel(&tableRelation.ConversationModel{})
if err != nil { if err != nil {
panic("db init err:" + err.Error()) panic("db init err:" + err.Error())
} }
@ -173,7 +173,7 @@ func (c *conversationServer) GetConversations(ctx context.Context, req *pbConver
func (c *conversationServer) BatchSetConversations(ctx context.Context, req *pbConversation.BatchSetConversationsReq) (*pbConversation.BatchSetConversationsResp, error) { func (c *conversationServer) BatchSetConversations(ctx context.Context, req *pbConversation.BatchSetConversationsReq) (*pbConversation.BatchSetConversationsResp, error) {
resp := &pbConversation.BatchSetConversationsResp{} resp := &pbConversation.BatchSetConversationsResp{}
var conversations []*table.ConversationModel var conversations []*tableRelation.ConversationModel
if err := utils.CopyStructFields(&conversations, req.Conversations); err != nil { if err := utils.CopyStructFields(&conversations, req.Conversations); err != nil {
return nil, err return nil, err
} }
@ -206,16 +206,16 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
return nil, err return nil, err
} }
} }
var conversation table.ConversationModel var conversation tableRelation.ConversationModel
if err := utils.CopyStructFields(&conversation, req.Conversation); err != nil { if err := utils.CopyStructFields(&conversation, req.Conversation); err != nil {
return nil, err return nil, err
} }
if req.FieldType == constant.FieldIsPrivateChat { if req.FieldType == constant.FieldIsPrivateChat {
err := c.ConversationInterface.SyncPeerUserPrivateConversationTx(ctx, req.Conversation) err := c.ConversationInterface.SyncPeerUserPrivateConversationTx(ctx, &conversation)
if err != nil { if err != nil {
return nil, err return nil, err
} }
chat.ConversationSetPrivateNotification(req.OperationID, req.Conversation.OwnerUserID, req.Conversation.UserID, req.Conversation.IsPrivateChat) chat.ConversationSetPrivateNotification(ctx, req.Conversation.OwnerUserID, req.Conversation.UserID, req.Conversation.IsPrivateChat)
return resp, nil return resp, nil
} }
//haveUserID, err := c.ConversationInterface.GetUserIDExistConversation(ctx, req.UserIDList, req.Conversation.ConversationID) //haveUserID, err := c.ConversationInterface.GetUserIDExistConversation(ctx, req.UserIDList, req.Conversation.ConversationID)
@ -242,29 +242,18 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
case constant.FieldBurnDuration: case constant.FieldBurnDuration:
filedMap["burn_duration"] = req.Conversation.BurnDuration filedMap["burn_duration"] = req.Conversation.BurnDuration
} }
c.ConversationInterface.SetUsersConversationFiledTx(ctx, req.UserIDList, &conversation, filedMap) err = c.ConversationInterface.SetUsersConversationFiledTx(ctx, req.UserIDList, &conversation, filedMap)
err = c.ConversationInterface.UpdateUsersConversationFiled(ctx, haveUserID, req.Conversation.ConversationID, filedMap)
if err != nil {
return nil, err
}
var conversations []*pbConversation.Conversation
for _, v := range utils.DifferenceString(haveUserID, req.UserIDList) {
temp := new(pbConversation.Conversation)
_ = utils.CopyStructFields(temp, req.Conversation)
temp.OwnerUserID = v
conversations = append(conversations, temp)
}
err = c.ConversationInterface.CreateConversation(ctx, conversations)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if isSyncConversation { if isSyncConversation {
for _, v := range req.UserIDList { for _, v := range req.UserIDList {
chat.ConversationChangeNotification(req.OperationID, v) chat.ConversationChangeNotification(ctx, v)
} }
} else { } else {
for _, v := range req.UserIDList { for _, v := range req.UserIDList {
chat.ConversationUnreadChangeNotification(req.OperationID, v, req.Conversation.ConversationID, req.Conversation.UpdateUnreadCountTime) chat.ConversationUnreadChangeNotification(ctx, v, req.Conversation.ConversationID, req.Conversation.UpdateUnreadCountTime)
} }
} }
return resp, nil return resp, nil

@ -41,7 +41,7 @@ func SetConversationNotification(operationID, sendID, recvID string, contentType
} }
// SetPrivate调用 // SetPrivate调用
func ConversationSetPrivateNotification(operationID, sendID, recvID string, isPrivateChat bool) { func ConversationSetPrivateNotification(ctx context.Context, sendID, recvID string, isPrivateChat bool) {
log.NewInfo(operationID, utils.GetSelfFuncName()) log.NewInfo(operationID, utils.GetSelfFuncName())
conversationSetPrivateTips := &open_im_sdk.ConversationSetPrivateTips{ conversationSetPrivateTips := &open_im_sdk.ConversationSetPrivateTips{
RecvID: recvID, RecvID: recvID,
@ -71,7 +71,7 @@ func ConversationChangeNotification(ctx context.Context, userID string) {
} }
//会话未读数同步 //会话未读数同步
func ConversationUnreadChangeNotification(operationID, userID, conversationID string, updateUnreadCountTime int64) { func ConversationUnreadChangeNotification(context context.Context, userID, conversationID string, updateUnreadCountTime int64) {
log.NewInfo(operationID, utils.GetSelfFuncName()) log.NewInfo(operationID, utils.GetSelfFuncName())
ConversationChangedTips := &open_im_sdk.ConversationUpdateTips{ ConversationChangedTips := &open_im_sdk.ConversationUpdateTips{
UserID: userID, UserID: userID,

@ -20,21 +20,25 @@ const (
superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
conversationExpireTime = time.Second * 60 * 60 * 12 conversationExpireTime = time.Second * 60 * 60 * 12
) )
type FuncDB func() (string, error)
// args fn will exec when no data in cache // args fn will exec when no data in cache
type ConversationCache interface { type ConversationCache interface {
// get user's conversationIDs from cache // get user's conversationIDs from cache
GetUserConversationIDs(ctx context.Context, userID string, fn func(ctx context.Context, userID string) ([]string, error)) ([]string, error) GetUserConversationIDs(ctx context.Context, userID string, fn FuncDB) ([]string, error)
// del user's conversationIDs from cache, call when a user add or reduce a conversation // del user's conversationIDs from cache, call when a user add or reduce a conversation
DelUserConversationIDs(ctx context.Context, userID string) error DelUserConversationIDs(ctx context.Context, userID string) error
DelUsersConversationIDs(ctx context.Context,userIDList []string)error
// get one conversation from cache // get one conversation from cache
GetConversation(ctx context.Context, ownerUserID, conversationID string, fn func(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error)) (*relationTb.ConversationModel, error) GetConversation(ctx context.Context, ownerUserID, conversationID string, fn FuncDB) (*relationTb.ConversationModel, error)
// get one conversation from cache // get one conversation from cache
GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string, fn func(ctx context.Context, ownerUserID, conversationIDs []string) ([]*relationTb.ConversationModel, error)) ([]*relationTb.ConversationModel, error) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string, fn FuncDB)([]*relationTb.ConversationModel, error)
// get one user's all conversations from cache // get one user's all conversations from cache
GetUserAllConversations(ctx context.Context, ownerUserID string, fn func(ctx context.Context, ownerUserIDs string) ([]*relationTb.ConversationModel, error)) ([]*relationTb.ConversationModel, error) GetUserAllConversations(ctx context.Context, ownerUserID string, fn FuncDB ) ([]*relationTb.ConversationModel, error)
// del one conversation from cache, call when one user's conversation Info changed // del one conversation from cache, call when one user's conversation Info changed
DelConversation(ctx context.Context, ownerUserID, conversationID string) error DelConversation(ctx context.Context, ownerUserID, conversationID string) error
DelUserConversations(ctx context.Context, ownerUserID string, conversationIDList []string) error
DelUsersConversation(ctx context.Context, ownerUserIDList []string, conversationID string) error
// get user conversation recv msg from cache // get user conversation recv msg from cache
GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string, fn func(ctx context.Context, ownerUserID, conversationID string) (opt int, err error)) (opt int, err error) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string, fn func(ctx context.Context, ownerUserID, conversationID string) (opt int, err error)) (opt int, err error)
// del user recv msg opt from cache, call when user's conversation recv msg opt changed // del user recv msg opt from cache, call when user's conversation recv msg opt changed
@ -51,6 +55,38 @@ type ConversationRedis struct {
rcClient *rockscache.Client rcClient *rockscache.Client
} }
func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, userID string, fn func(ctx context.Context, userID string) ([]string, error)) ([]string, error) {
panic("implement me")
}
func (c *ConversationRedis) DelUsersConversationIDs(ctx context.Context, userIDList []string) error {
panic("implement me")
}
func (c *ConversationRedis) GetConversation(ctx context.Context, ownerUserID, conversationID string, fn func(ctx context.Context, ownerUserID string, conversationID string) (*relationTb.ConversationModel, error)) (*relationTb.ConversationModel, error) {
panic("implement me")
}
func (c *ConversationRedis) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string, fn FuncDB) ([]*relationTb.ConversationModel, error) {
panic("implement me")
}
func (c *ConversationRedis) GetUserAllConversations(ctx context.Context, ownerUserID string, fn FuncDB) ([]*relationTb.ConversationModel, error) {
panic("implement me")
}
func (c *ConversationRedis) DelUsersConversation(ctx context.Context, ownerUserIDList []string, conversationID string) error {
panic("implement me")
}
func (c *ConversationRedis) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string, fn func(ctx context.Context, ownerUserID string, conversationID string) (opt int, err error)) (opt int, err error) {
panic("implement me")
}
func (c *ConversationRedis) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string, fn func(ctx context.Context, groupID string) (userIDs []string, err error)) (userIDs []string, err error) {
panic("implement me")
}
func NewConversationRedis(rcClient *rockscache.Client) *ConversationRedis { func NewConversationRedis(rcClient *rockscache.Client) *ConversationRedis {
return &ConversationRedis{rcClient: rcClient} return &ConversationRedis{rcClient: rcClient}
} }

@ -1,32 +1,42 @@
package controller package controller
import ( import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/cache" "Open_IM/pkg/common/db/cache"
"Open_IM/pkg/common/db/relation" "Open_IM/pkg/common/db/relation"
relationTb "Open_IM/pkg/common/db/table/relation" relationTb "Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/utils"
"context" "context"
"encoding/json"
"gorm.io/gorm"
) )
type ConversationInterface interface { type ConversationInterface interface {
//GetUserIDExistConversation 获取拥有该会话的的用户ID列表 //GetUserIDExistConversation 获取拥有该会话的的用户ID列表
GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error)
//UpdateUserConversationFiled 更新用户该会话的属性信息 //UpdateUserConversationFiled 更新用户该会话的属性信息
UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error UpdateUsersConversationFiled(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}) error
//CreateConversation 创建一批新的会话 //CreateConversation 创建一批新的会话
CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error
//SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作 //SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作
SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error
//FindConversations 根据会话ID获取某个用户的多个会话 //FindConversations 根据会话ID获取某个用户的多个会话
FindConversations(ctx context.Context, ownerUserID string, conversationID []string) ([]*relationTb.ConversationModel, error) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error)
//GetUserAllConversation 获取一个用户在服务器上所有的会话 //GetUserAllConversation 获取一个用户在服务器上所有的会话
GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error)
//SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性 //SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性
SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error
//SetUsersConversationFiledTx 设置多个用户会话关于某个字段的更新操作,如果会话不存在则创建,否则更新,内部保证事务操作
SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error
} }
type ConversationController struct { type ConversationController struct {
database ConversationDataBaseInterface database ConversationDataBaseInterface
} }
func (c *ConversationController) SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error {
return c.database.SetUsersConversationFiledTx(ctx, userIDList, conversation, filedMap)
}
func NewConversationController(database ConversationDataBaseInterface) *ConversationController { func NewConversationController(database ConversationDataBaseInterface) *ConversationController {
return &ConversationController{database: database} return &ConversationController{database: database}
} }
@ -36,26 +46,26 @@ func (c *ConversationController) GetUserIDExistConversation(ctx context.Context,
} }
func (c ConversationController) UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error { func (c ConversationController) UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error {
panic("implement me") return c.database.UpdateUsersConversationFiled(ctx, UserIDList, conversationID, args)
} }
func (c ConversationController) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error { func (c ConversationController) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error {
panic("implement me") return c.database.CreateConversation(ctx, conversations)
} }
func (c ConversationController) SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error { func (c ConversationController) SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error {
panic("implement me") return c.database.SyncPeerUserPrivateConversationTx(ctx, conversation)
} }
func (c ConversationController) FindConversations(ctx context.Context, ownerUserID string, conversationID []string) ([]*relationTb.ConversationModel, error) { func (c ConversationController) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) {
panic("implement me") return c.database.FindConversations(ctx, ownerUserID, conversationIDs)
} }
func (c ConversationController) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) { func (c ConversationController) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) {
panic("implement me") return c.database.GetUserAllConversation(ctx, ownerUserID)
} }
func (c ConversationController) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error { func (c ConversationController) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error {
panic("implement me") return c.database.SetUserConversations(ctx, ownerUserID, conversations)
} }
var _ ConversationInterface = (*ConversationController)(nil) var _ ConversationInterface = (*ConversationController)(nil)
@ -70,19 +80,69 @@ type ConversationDataBaseInterface interface {
//SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作 //SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作
SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error
//FindConversations 根据会话ID获取某个用户的多个会话 //FindConversations 根据会话ID获取某个用户的多个会话
FindConversations(ctx context.Context, ownerUserID string, conversationID []string) ([]*relationTb.ConversationModel, error) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error)
//GetUserAllConversation 获取一个用户在服务器上所有的会话 //GetUserAllConversation 获取一个用户在服务器上所有的会话
GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error)
//SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性 //SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性
SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error
//SetUsersConversationFiledTx 设置多个用户会话关于某个字段的更新操作,如果会话不存在则创建,否则更新,内部保证事务操作
SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error
} }
var _ ConversationDataBaseInterface = (*ConversationDataBase)(nil)
type ConversationDataBase struct { type ConversationDataBase struct {
db relation.Conversation conversationDB relation.Conversation
cache cache.ConversationCache cache cache.ConversationCache
db *gorm.DB
}
func (c ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDList []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error {
return c.db.Transaction(func(tx *gorm.DB) error {
haveUserID, err := c.conversationDB.FindUserID(ctx, userIDList, conversation.ConversationID, tx)
if err != nil {
return err
}
if len(haveUserID) > 0 {
err = c.conversationDB.UpdateByMap(ctx, haveUserID, conversation.ConversationID, filedMap, tx)
if err != nil {
return err
}
}
NotUserID := utils.DifferenceString(haveUserID, userIDList)
var cList []*relationTb.ConversationModel
for _, v := range NotUserID {
temp := new(relationTb.ConversationModel)
if err := utils.CopyStructFields(temp, conversation); err != nil {
return err
}
temp.OwnerUserID = v
cList = append(cList, temp)
}
err = c.conversationDB.Create(ctx, cList)
if err != nil {
return err
}
if len(NotUserID) > 0 {
err = c.cache.DelUsersConversationIDs(ctx, NotUserID)
if err != nil {
return err
}
}
err = c.cache.DelUsersConversation(ctx, haveUserID, conversation.ConversationID)
if err != nil {
return err
}
return nil
})
}
func NewConversationDataBase(db relation.Conversation, cache cache.ConversationCache) *ConversationDataBase {
return &ConversationDataBase{conversationDB: db, cache: cache}
} }
func (c ConversationDataBase) GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) { func (c ConversationDataBase) GetUserIDExistConversation(ctx context.Context, userIDList []string, conversationID string) ([]string, error) {
panic("implement me")
} }
func (c ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error { func (c ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error {
@ -94,26 +154,155 @@ func (c ConversationDataBase) CreateConversation(ctx context.Context, conversati
} }
func (c ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error { func (c ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error {
panic("implement me") return c.db.Transaction(func(tx *gorm.DB) error {
userIDList := []string{conversation.OwnerUserID, conversation.UserID}
haveUserID, err := c.conversationDB.FindUserID(ctx, userIDList, conversation.ConversationID, tx)
if err != nil {
return err
}
filedMap := map[string]interface{}{"is_private_chat": conversation.IsPrivateChat}
if len(haveUserID) > 0 {
err = c.conversationDB.UpdateByMap(ctx, haveUserID, conversation.ConversationID, filedMap, tx)
if err != nil {
return err
}
}
NotUserID := utils.DifferenceString(haveUserID, userIDList)
var cList []*relationTb.ConversationModel
for _, v := range NotUserID {
temp := new(relationTb.ConversationModel)
if v == conversation.UserID {
temp.OwnerUserID = conversation.UserID
temp.ConversationID = utils.GetConversationIDBySessionType(conversation.OwnerUserID, constant.SingleChatType)
temp.ConversationType = constant.SingleChatType
temp.UserID = conversation.OwnerUserID
temp.IsPrivateChat = conversation.IsPrivateChat
} else {
if err := utils.CopyStructFields(temp, conversation); err != nil {
return err
}
temp.OwnerUserID = v
}
cList = append(cList, temp)
}
if len(NotUserID) > 0 {
err = c.conversationDB.Create(ctx, cList)
if err != nil {
return err
}
}
err = c.cache.DelUsersConversationIDs(ctx, NotUserID)
if err != nil {
return err
}
err = c.cache.DelUsersConversation(ctx, haveUserID, conversation.ConversationID)
if err != nil {
return err
}
return nil
})
} }
func (c ConversationDataBase) FindConversations(ctx context.Context, ownerUserID string, conversationID []string) ([]*relationTb.ConversationModel, error) { func (c ConversationDataBase) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) {
panic("implement me") getConversation := func() (string, error) {
conversationList, err := c.conversationDB.Find(ctx, ownerUserID, conversationIDs)
if err != nil {
return "", utils.Wrap(err, "get failed")
}
bytes, err := json.Marshal(conversationList)
if err != nil {
return "", utils.Wrap(err, "Marshal failed")
}
return string(bytes), nil
}
return c.cache.GetConversations(ctx, ownerUserID, conversationIDs, getConversation)
}
func (c ConversationDataBase) GetConversation(ctx context.Context, ownerUserID string, conversationID string) (*relationTb.ConversationModel, error) {
getConversation := func() (string, error) {
conversationList, err := c.conversationDB.Take(ctx, ownerUserID, conversationID)
if err != nil {
return "", utils.Wrap(err, "get failed")
}
bytes, err := json.Marshal(conversationList)
if err != nil {
return "", utils.Wrap(err, "Marshal failed")
}
return string(bytes), nil
}
return c.cache.GetConversation(ctx, ownerUserID, conversationID, getConversation)
} }
func (c ConversationDataBase) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) { func (c ConversationDataBase) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) {
panic("implement me") getConversationIDList := func() (string, error) {
conversationIDList, err := c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID)
if err != nil {
return "", utils.Wrap(err, "getConversationIDList failed")
}
bytes, err := json.Marshal(conversationIDList)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
conversationIDList, err := c.cache.GetUserConversationIDs(ctx, ownerUserID, getConversationIDList)
if err != nil {
return nil, err
}
var conversations []*relationTb.ConversationModel
for _, conversationID := range conversationIDList {
conversation, tErr := c.GetConversation(ctx, ownerUserID, conversationID)
if tErr != nil {
return nil, utils.Wrap(tErr, "GetConversation failed")
}
conversations = append(conversations, conversation)
}
return conversations, nil
} }
func (c ConversationDataBase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error { func (c ConversationDataBase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error {
panic("implement me") return c.db.Transaction(func(tx *gorm.DB) error {
} var conversationIDList []string
for _, conversation := range conversations {
conversationIDList = append(conversationIDList, conversation.ConversationID)
}
haveConversations, err := c.conversationDB.Find(ctx, ownerUserID, conversationIDList, tx)
if err != nil {
return err
}
if len(haveConversations) > 0 {
err = c.conversationDB.Update(ctx, conversations, tx)
if err != nil {
return err
}
}
var haveConversationID []string
for _, conversation := range haveConversations {
haveConversationID = append(haveConversationID, conversation.ConversationID)
}
func NewConversationDataBase(db relation.Conversation, cache cache.ConversationCache) *ConversationDataBase { NotConversationID := utils.DifferenceString(haveConversationID, conversationIDList)
return &ConversationDataBase{db: db, cache: cache} var NotConversations []*relationTb.ConversationModel
for _, conversation := range conversations {
if !utils.IsContain(conversation.ConversationID, haveConversationID) {
NotConversations = append(NotConversations, conversation)
}
}
if len(NotConversations) > 0 {
err = c.conversationDB.Create(ctx, NotConversations)
if err != nil {
return err
}
}
err = c.cache.DelUsersConversationIDs(ctx, NotConversationID)
if err != nil {
return err
}
err = c.cache.DelUserConversations(ctx, ownerUserID, haveConversationID)
if err != nil {
return err
}
return nil
})
} }
//func NewConversationController(db *gorm.DB, rdb redis.UniversalClient) ConversationInterface {
// groupController := &ConversationController{database: newGroupDatabase(db, rdb, mgoClient)}
// return groupController
//}

@ -1,7 +1,7 @@
package relation package relation
import ( import (
"Open_IM/pkg/common/db/table" "Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/tracelog" "Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context" "context"
@ -10,12 +10,15 @@ import (
type Conversation interface { type Conversation interface {
TableName() string TableName() string
Create(ctx context.Context, conversations []*table.ConversationModel) (err error) Create(ctx context.Context, conversations []*relation.ConversationModel, tx ...any) (err error)
Delete(ctx context.Context, groupIDs []string) (err error) Delete(ctx context.Context, groupIDs []string) (err error)
UpdateByMap(ctx context.Context, groupID string, args map[string]interface{}) (err error) UpdateByMap(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}, tx ...any) (err error)
Update(ctx context.Context, groups []*table.ConversationModel) (err error) Update(ctx context.Context, conversations []*relation.ConversationModel, tx ...any) (err error)
Find(ctx context.Context, groupIDs []string) (groups []*table.ConversationModel, err error) Find(ctx context.Context, ownerUserID string, conversationIDs []string, tx ...any) (conversations []*relation.ConversationModel, err error)
Take(ctx context.Context, groupID string) (group *table.ConversationModel, err error) FindUserID(ctx context.Context, userIDList []string, conversationID string, tx ...any) ([]string, error)
FindUserIDAllConversationID(ctx context.Context, userID string, tx ...any) ([]string, error)
Take(ctx context.Context, userID, conversationID string, tx ...any) (conversation *relation.ConversationModel, err error)
FindConversationID(ctx context.Context, userID string, conversationIDList []string, tx ...any) (existConversationID []string, err error)
} }
type ConversationGorm struct { type ConversationGorm struct {
DB *gorm.DB DB *gorm.DB
@ -29,45 +32,69 @@ func NewConversationGorm(DB *gorm.DB) Conversation {
return &ConversationGorm{DB: DB} return &ConversationGorm{DB: DB}
} }
func (c *ConversationGorm) Create(ctx context.Context, conversations []*table.ConversationModel) (err error) { func (c *ConversationGorm) Create(ctx context.Context, conversations []*relation.ConversationModel, tx ...any) (err error) {
defer func() { defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "conversations", conversations) tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "conversations", conversations)
}() }()
return utils.Wrap(getDBConn(g.DB, tx).Create(&conversations).Error, "") return utils.Wrap(getDBConn(c.DB, tx).Create(&conversations).Error, "")
} }
func (c *ConversationGorm) Delete(ctx context.Context, groupIDs []string) (err error) { func (c *ConversationGorm) Delete(ctx context.Context, groupIDs []string) (err error) {
defer func() { defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupIDs", groupIDs) tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupIDs", groupIDs)
}() }()
return utils.Wrap(getDBConn(g.DB, tx).Where("group_id in (?)", groupIDs).Delete(&table.ConversationModel{}).Error, "") return utils.Wrap(getDBConn(g.DB, tx).Where("group_id in (?)", groupIDs).Delete(&relation.ConversationModel{}).Error, "")
} }
func (c *ConversationGorm) UpdateByMap(ctx context.Context, groupID string, args map[string]interface{}) (err error) { func (c *ConversationGorm) UpdateByMap(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}, tx ...any) (err error) {
defer func() { defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "args", args) tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userIDList", userIDList, "conversationID", conversationID)
}() }()
return utils.Wrap(getDBConn(g.DB, tx).Where("group_id = ?", groupID).Model(g).Updates(args).Error, "") return utils.Wrap(getDBConn(c.DB, tx).Model(&relation.ConversationModel{}).Where("owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Updates(args).Error, "")
} }
func (c *ConversationGorm) Update(ctx context.Context, groups []*table.ConversationModel) (err error) { func (c *ConversationGorm) Update(ctx context.Context, conversations []*relation.ConversationModel, tx ...any) (err error) {
defer func() { defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groups", groups) tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "conversations", conversations)
}() }()
return utils.Wrap(getDBConn(g.DB, tx).Updates(&groups).Error, "") return utils.Wrap(getDBConn(c.DB, tx).Updates(&conversations).Error, "")
} }
func (c *ConversationGorm) Find(ctx context.Context, groupIDs []string) (groups []*table.ConversationModel, err error) { func (c *ConversationGorm) Find(ctx context.Context, ownerUserID string, conversationIDs []string, tx ...any) (conversations []*relation.ConversationModel, err error) {
var newConversations []relation.ConversationModel
defer func() { defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupIDs", groupIDs, "groups", groups) tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "groups", conversations)
}() }()
return groups, utils.Wrap(getDBConn(g.DB, tx).Where("group_id in (?)", groupIDs).Find(&groups).Error, "") err = utils.Wrap(getDBConn(c.DB, tx).Where("owner_user_id=? and conversation_id IN (?)", ownerUserID, conversationIDs).Find(&newConversations).Error, "")
for _, v := range newConversations {
v1 := v
conversations = append(conversations, &v1)
}
return conversations, err
} }
func (c *ConversationGorm) Take(ctx context.Context, groupID string) (group *table.ConversationModel, err error) { func (c *ConversationGorm) Take(ctx context.Context, userID, conversationID string, tx ...any) (conversation *relation.ConversationModel, err error) {
group = &Group{} cc := &relation.ConversationModel{}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "conversation", *conversation)
}()
return cc, utils.Wrap(getDBConn(c.DB, tx).Where("conversation_id = ? And owner_user_id = ?", conversationID, userID).Take(cc).Error, "")
}
func (c *ConversationGorm) FindUserID(ctx context.Context, userIDList []string, conversationID string, tx ...any) (existUserID []string, err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userIDList, "existUserID", existUserID)
}()
return existUserID, utils.Wrap(getDBConn(c.DB, tx).Where(" owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Pluck("owner_user_id", &existUserID).Error, "")
}
func (c *ConversationGorm) FindConversationID(ctx context.Context, userID string, conversationIDList []string, tx ...any) (existConversationID []string, err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "existConversationIDList", existConversationID)
}()
return existConversationID, utils.Wrap(getDBConn(c.DB, tx).Where(" conversation_id IN (?) and owner_user_id=?", conversationIDList, userID).Pluck("conversation_id", &existConversationID).Error, "")
}
func (c *ConversationGorm) FindUserIDAllConversationID(ctx context.Context, userID string, tx ...any) (conversationIDList []string, err error) {
defer func() { defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "group", *group) tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "conversationIDList", conversationIDList)
}() }()
return group, utils.Wrap(getDBConn(g.DB, tx).Where("group_id = ?", groupID).Take(group).Error, "") return conversationIDList, utils.Wrap(getDBConn(c.DB, tx).Model(&relation.ConversationModel{}).Where("owner_user_id=?", userID).Pluck("conversation_id", &conversationIDList).Error, "")
} }

Loading…
Cancel
Save