conversation

pull/1427/head
withchao 2 years ago
parent 55fade28db
commit 7d3aeceea6

@ -16,6 +16,9 @@ package conversation
import ( import (
"context" "context"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/newmgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -54,7 +57,14 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
if err != nil { if err != nil {
return err return err
} }
conversationDB := relation.NewConversationGorm(db) mongo, err := unrelation.NewMongo()
if err != nil {
return err
}
conversationDB, err := newmgo.NewConversationMongo(mongo.GetDatabase())
if err != nil {
return err
}
groupRpcClient := rpcclient.NewGroupRpcClient(client) groupRpcClient := rpcclient.NewGroupRpcClient(client)
msgRpcClient := rpcclient.NewMessageRpcClient(client) msgRpcClient := rpcclient.NewMessageRpcClient(client)
pbconversation.RegisterConversationServer(server, &conversationServer{ pbconversation.RegisterConversationServer(server, &conversationServer{
@ -229,11 +239,12 @@ func (c *conversationServer) SetConversations(ctx context.Context,
// 获取超级大群开启免打扰的用户ID. // 获取超级大群开启免打扰的用户ID.
func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req *pbconversation.GetRecvMsgNotNotifyUserIDsReq) (*pbconversation.GetRecvMsgNotNotifyUserIDsResp, error) { func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req *pbconversation.GetRecvMsgNotNotifyUserIDsReq) (*pbconversation.GetRecvMsgNotNotifyUserIDsResp, error) {
userIDs, err := c.conversationDatabase.FindRecvMsgNotNotifyUserIDs(ctx, req.GroupID) //userIDs, err := c.conversationDatabase.FindRecvMsgNotNotifyUserIDs(ctx, req.GroupID)
if err != nil { //if err != nil {
return nil, err // return nil, err
} //}
return &pbconversation.GetRecvMsgNotNotifyUserIDsResp{UserIDs: userIDs}, nil //return &pbconversation.GetRecvMsgNotNotifyUserIDsResp{UserIDs: userIDs}, nil
return nil, errors.New("deprecated")
} }
// create conversation without notification for msg redis transfer. // create conversation without notification for msg redis transfer.

@ -16,6 +16,7 @@ package tools
import ( import (
"context" "context"
"github.com/OpenIMSDK/protocol/sdkws"
"math/rand" "math/rand"
"time" "time"
@ -91,7 +92,11 @@ func (c *MsgTool) ConversationsDestructMsgs() {
} }
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
pageNumber := rand.Int63() % maxPage pageNumber := rand.Int63() % maxPage
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, int32(pageNumber), batchNum) pagination := &sdkws.RequestPagination{
PageNumber: int32(pageNumber),
ShowNumber: batchNum,
}
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination)
if err != nil { if err != nil {
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
continue continue

@ -17,6 +17,7 @@ package tools
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/newmgo" "github.com/openimsdk/open-im-server/v3/pkg/common/db/newmgo"
"math" "math"
@ -106,10 +107,14 @@ func InitMsgTool() (*MsgTool, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
conversationDB, err := newmgo.NewConversationMongo(mongo.GetDatabase())
if err != nil {
return nil, err
}
groupDatabase := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, tx.NewMongo(mongo.GetClient()), nil) groupDatabase := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, tx.NewMongo(mongo.GetClient()), nil)
conversationDatabase := controller.NewConversationDatabase( conversationDatabase := controller.NewConversationDatabase(
relation.NewConversationGorm(db), conversationDB,
cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), relation.NewConversationGorm(db)), cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB),
tx.NewGorm(db), tx.NewGorm(db),
) )
msgRpcClient := rpcclient.NewMessageRpcClient(discov) msgRpcClient := rpcclient.NewMessageRpcClient(discov)
@ -156,7 +161,11 @@ func (c *MsgTool) AllConversationClearMsgAndFixSeq() {
} }
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
pageNumber := rand.Int63() % maxPage pageNumber := rand.Int63() % maxPage
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, int32(pageNumber), batchNum) pagination := &sdkws.RequestPagination{
PageNumber: int32(pageNumber),
ShowNumber: batchNum,
}
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination)
if err != nil { if err != nil {
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
continue continue

@ -26,7 +26,6 @@ import (
"github.com/OpenIMSDK/tools/utils" "github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/relation"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
) )
@ -67,10 +66,10 @@ type ConversationCache interface {
GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error)
DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache
// get one super group recv msg but do not notification userID list // get one super group recv msg but do not notification userID list
GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) //GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error)
DelSuperGroupRecvMsgNotNotifyUserIDs(groupID string) ConversationCache DelSuperGroupRecvMsgNotNotifyUserIDs(groupID string) ConversationCache
// get one super group recv msg but do not notification userID list hash // get one super group recv msg but do not notification userID list hash
GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) //GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error)
DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache
//GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) //GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
@ -101,20 +100,20 @@ type ConversationRedisCache struct {
expireTime time.Duration expireTime time.Duration
} }
func NewNewConversationRedis( //func NewNewConversationRedis(
rdb redis.UniversalClient, // rdb redis.UniversalClient,
conversationDB *relation.ConversationGorm, // conversationDB *relation.ConversationGorm,
options rockscache.Options, // options rockscache.Options,
) ConversationCache { //) ConversationCache {
rcClient := rockscache.NewClient(rdb, options) // rcClient := rockscache.NewClient(rdb, options)
//
return &ConversationRedisCache{ // return &ConversationRedisCache{
rcClient: rcClient, // rcClient: rcClient,
metaCache: NewMetaCacheRedis(rcClient), // metaCache: NewMetaCacheRedis(rcClient),
conversationDB: conversationDB, // conversationDB: conversationDB,
expireTime: conversationExpireTime, // expireTime: conversationExpireTime,
} // }
} //}
func (c *ConversationRedisCache) NewCache() ConversationCache { func (c *ConversationRedisCache) NewCache() ConversationCache {
return &ConversationRedisCache{ return &ConversationRedisCache{
@ -282,11 +281,11 @@ func (c *ConversationRedisCache) GetUserRecvMsgOpt(ctx context.Context, ownerUse
}) })
} }
func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) { //func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) {
return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsKey(groupID), c.expireTime, func(ctx context.Context) (userIDs []string, err error) { // return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsKey(groupID), c.expireTime, func(ctx context.Context) (userIDs []string, err error) {
return c.conversationDB.FindSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) // return c.conversationDB.FindSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
}) // })
} //}
func (c *ConversationRedisCache) DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache { func (c *ConversationRedisCache) DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache {
keys := make([]string, 0, len(ownerUserIDs)) keys := make([]string, 0, len(ownerUserIDs))
@ -313,19 +312,19 @@ func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDs(groupID st
return cache return cache
} }
func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) { //func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) {
return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID), c.expireTime, func(ctx context.Context) (hash uint64, err error) { // return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID), c.expireTime, func(ctx context.Context) (hash uint64, err error) {
userIDs, err := c.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) // userIDs, err := c.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
if err != nil { // if err != nil {
return 0, err // return 0, err
} // }
utils.Sort(userIDs, true) // utils.Sort(userIDs, true)
bi := big.NewInt(0) // bi := big.NewInt(0)
bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16) // bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16)
return bi.Uint64(), nil // return bi.Uint64(), nil
}, // },
) // )
} //}
func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache { func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache {
cache := c.NewCache() cache := c.NewCache()

@ -16,6 +16,7 @@ package controller
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/pagination"
"time" "time"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
@ -39,7 +40,7 @@ type ConversationDatabase interface {
// FindConversations 根据会话ID获取某个用户的多个会话 // FindConversations 根据会话ID获取某个用户的多个会话
FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationtb.ConversationModel, error) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationtb.ConversationModel, error)
// FindRecvMsgNotNotifyUserIDs 获取超级大群开启免打扰的用户ID // FindRecvMsgNotNotifyUserIDs 获取超级大群开启免打扰的用户ID
FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) //FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
// GetUserAllConversation 获取一个用户在服务器上所有的会话 // GetUserAllConversation 获取一个用户在服务器上所有的会话
GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationtb.ConversationModel, error) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationtb.ConversationModel, error)
// SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性 // SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性
@ -51,7 +52,7 @@ type ConversationDatabase interface {
GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error)
GetAllConversationIDs(ctx context.Context) ([]string, error) GetAllConversationIDs(ctx context.Context) ([]string, error)
GetAllConversationIDsNumber(ctx context.Context) (int64, error) GetAllConversationIDsNumber(ctx context.Context) (int64, error)
PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) (conversationIDs []string, err error) PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error)
//GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) //GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error)
GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.ConversationModel, error) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.ConversationModel, error)
@ -255,9 +256,9 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs
return cache.ExecDel(ctx) return cache.ExecDel(ctx)
} }
func (c *conversationDatabase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) { //func (c *conversationDatabase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) {
return c.cache.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) // return c.cache.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
} //}
func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error { func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error {
cache := c.cache.NewCache() cache := c.cache.NewCache()
@ -311,14 +312,10 @@ func (c *conversationDatabase) GetAllConversationIDsNumber(ctx context.Context)
return c.conversationDB.GetAllConversationIDsNumber(ctx) return c.conversationDB.GetAllConversationIDsNumber(ctx)
} }
func (c *conversationDatabase) PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) ([]string, error) { func (c *conversationDatabase) PageConversationIDs(ctx context.Context, pagination pagination.Pagination) ([]string, error) {
return c.conversationDB.PageConversationIDs(ctx, pageNumber, showNumber) return c.conversationDB.PageConversationIDs(ctx, pagination)
} }
//func (c *conversationDatabase) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) {
// return c.cache.GetUserAllHasReadSeqs(ctx, ownerUserID)
//}
func (c *conversationDatabase) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error) { func (c *conversationDatabase) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error) {
return c.conversationDB.GetConversationsByConversationID(ctx, conversationIDs) return c.conversationDB.GetConversationsByConversationID(ctx, conversationIDs)
} }

@ -0,0 +1,134 @@
package newmgo
import (
"context"
"github.com/OpenIMSDK/protocol/constant"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/newmgo/mgotool"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/pagination"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
)
func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) {
return &ConversationMgo{
coll: db.Collection("conversation"),
}, nil
}
type ConversationMgo struct {
coll *mongo.Collection
}
func (c *ConversationMgo) Create(ctx context.Context, conversations []*relation.ConversationModel) (err error) {
return mgotool.InsertMany(ctx, c.coll, conversations)
}
func (c *ConversationMgo) Delete(ctx context.Context, groupIDs []string) (err error) {
return mgotool.DeleteMany(ctx, c.coll, bson.M{"group_id": bson.M{"$in": groupIDs}})
}
func (c *ConversationMgo) UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error) {
res, err := mgotool.UpdateMany(ctx, c.coll, bson.M{"owner_user_id": bson.M{"$in": userIDs}, "conversation_id": conversationID}, bson.M{"$set": args})
if err != nil {
return 0, err
}
return res.ModifiedCount, nil
}
func (c *ConversationMgo) Update(ctx context.Context, conversation *relation.ConversationModel) (err error) {
return mgotool.UpdateOne(ctx, c.coll, bson.M{"owner_user_id": conversation.OwnerUserID, "conversation_id": conversation.ConversationID}, bson.M{"$set": conversation}, true)
}
func (c *ConversationMgo) Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*relation.ConversationModel, err error) {
return mgotool.Find[*relation.ConversationModel](ctx, c.coll, bson.M{"owner_user_id": ownerUserID, "conversation_id": bson.M{"$in": conversationIDs}})
}
func (c *ConversationMgo) FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error) {
return mgotool.Find[string](ctx, c.coll, bson.M{"owner_user_id": bson.M{"$in": userIDs}, "conversation_id": bson.M{"$in": conversationIDs}}, options.Find().SetProjection(bson.M{"owner_user_id": 1}))
}
func (c *ConversationMgo) FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error) {
return mgotool.Find[string](ctx, c.coll, bson.M{"owner_user_id": userID}, options.Find().SetProjection(bson.M{"conversation_id": 1}))
}
func (c *ConversationMgo) Take(ctx context.Context, userID, conversationID string) (conversation *relation.ConversationModel, err error) {
return mgotool.FindOne[*relation.ConversationModel](ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": conversationID})
}
func (c *ConversationMgo) FindConversationID(ctx context.Context, userID string, conversationIDs []string) (existConversationID []string, err error) {
return mgotool.Find[string](ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": bson.M{"$in": conversationIDs}}, options.Find().SetProjection(bson.M{"conversation_id": 1}))
}
func (c *ConversationMgo) FindUserIDAllConversations(ctx context.Context, userID string) (conversations []*relation.ConversationModel, err error) {
return mgotool.Find[*relation.ConversationModel](ctx, c.coll, bson.M{"owner_user_id": userID})
}
func (c *ConversationMgo) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) {
return mgotool.Find[string](ctx, c.coll, bson.M{"group_id": groupID, "recv_msg_opt": constant.ReceiveNotNotifyMessage}, options.Find().SetProjection(bson.M{"owner_user_id": 1}))
}
func (c *ConversationMgo) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) {
return mgotool.FindOne[int](ctx, c.coll, bson.M{"owner_user_id": ownerUserID, "conversation_id": conversationID}, options.FindOne().SetProjection(bson.M{"recv_msg_opt": 1}))
}
func (c *ConversationMgo) GetAllConversationIDs(ctx context.Context) ([]string, error) {
return mgotool.Aggregate[string](ctx, c.coll, []bson.M{
{"$group": bson.M{"_id": "$conversation_id"}},
{"$project": bson.M{"_id": 0, "conversation_id": "$_id"}},
})
}
func (c *ConversationMgo) GetAllConversationIDsNumber(ctx context.Context) (int64, error) {
counts, err := mgotool.Aggregate[int64](ctx, c.coll, []bson.M{
{"$group": bson.M{"_id": "$conversation_id"}},
{"$project": bson.M{"_id": 0, "conversation_id": "$_id"}},
})
if err != nil {
return 0, err
}
if len(counts) == 0 {
return 0, nil
}
return counts[0], nil
}
func (c *ConversationMgo) PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error) {
return mgotool.FindPageOnly[string](ctx, c.coll, bson.M{}, pagination, options.Find().SetProjection(bson.M{"conversation_id": 1}))
}
func (c *ConversationMgo) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relation.ConversationModel, error) {
return mgotool.Find[*relation.ConversationModel](ctx, c.coll, bson.M{"conversation_id": bson.M{"$in": conversationIDs}})
}
func (c *ConversationMgo) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relation.ConversationModel, error) {
//"is_msg_destruct = 1 && msg_destruct_time != 0 && (UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) || latest_msg_destruct_time is NULL)"
return mgotool.Find[*relation.ConversationModel](ctx, c.coll, bson.M{
"is_msg_destruct": 1,
"msg_destruct_time": bson.M{"$ne": 0},
"$or": []bson.M{
{
"$expr": bson.M{
"$gt": []any{
time.Now(),
bson.M{"$add": []any{"$msg_destruct_time", "$latest_msg_destruct_time"}},
},
},
},
{
"latest_msg_destruct_time": nil,
},
},
})
}
func (c *ConversationMgo) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) {
return mgotool.Find[string](ctx, c.coll, bson.M{"conversation_id": conversationID, "recv_msg_opt": bson.M{"$ne": constant.ReceiveMessage}}, options.Find().SetProjection(bson.M{"owner_user_id": 1}))
}
func (c *ConversationMgo) NewTx(tx any) relation.ConversationModelInterface {
//TODO implement me
panic("implement me")
}

@ -48,6 +48,14 @@ func UpdateOne(ctx context.Context, coll *mongo.Collection, filter any, update a
return nil return nil
} }
func UpdateMany(ctx context.Context, coll *mongo.Collection, filter any, update any, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) {
res, err := coll.UpdateMany(ctx, filter, update, opts...)
if err != nil {
return nil, errs.Wrap(err)
}
return res, nil
}
func Find[T any](ctx context.Context, coll *mongo.Collection, filter any, opts ...*options.FindOptions) ([]T, error) { func Find[T any](ctx context.Context, coll *mongo.Collection, filter any, opts ...*options.FindOptions) ([]T, error) {
cur, err := coll.Find(ctx, filter, opts...) cur, err := coll.Find(ctx, filter, opts...)
if err != nil { if err != nil {
@ -92,6 +100,15 @@ func FindPage[T any](ctx context.Context, coll *mongo.Collection, filter any, pa
return count, res, nil return count, res, nil
} }
func FindPageOnly[T any](ctx context.Context, coll *mongo.Collection, filter any, pagination pagination.Pagination, opts ...*options.FindOptions) ([]T, error) {
skip := int64(pagination.GetPageNumber()-1) * int64(pagination.GetShowNumber())
if skip < 0 || pagination.GetShowNumber() <= 0 {
return nil, nil
}
opt := options.Find().SetSkip(skip).SetLimit(int64(pagination.GetShowNumber()))
return Find[T](ctx, coll, filter, append(opts, opt)...)
}
func Count(ctx context.Context, coll *mongo.Collection, filter any, opts ...*options.CountOptions) (int64, error) { func Count(ctx context.Context, coll *mongo.Collection, filter any, opts ...*options.CountOptions) (int64, error) {
return coll.CountDocuments(ctx, filter, opts...) return coll.CountDocuments(ctx, filter, opts...)
} }
@ -119,6 +136,14 @@ func DeleteMany(ctx context.Context, coll *mongo.Collection, filter any, opts ..
return nil return nil
} }
//func Upsert[T any](ctx context.Context, coll *mongo.Collection, val *T, opts ...*options.InsertManyOptions) error { func Aggregate[T any](ctx context.Context, coll *mongo.Collection, pipeline any, opts ...*options.AggregateOptions) ([]T, error) {
// return nil cur, err := coll.Aggregate(ctx, pipeline, opts...)
//} if err != nil {
return nil, err
}
var ts []T
if err := cur.All(ctx, &ts); err != nil {
return nil, err
}
return ts, nil
}

@ -14,237 +14,231 @@
package relation package relation
import ( //
"context" //import (
// "context"
"github.com/OpenIMSDK/tools/errs" //
"gorm.io/gorm" // "github.com/OpenIMSDK/tools/errs"
// "gorm.io/gorm"
"github.com/OpenIMSDK/protocol/constant" //
"github.com/OpenIMSDK/tools/utils" // "github.com/OpenIMSDK/protocol/constant"
// "github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" //
) // "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
//)
type ConversationGorm struct { //
*MetaDB //type ConversationGorm struct {
} // *MetaDB
//}
func NewConversationGorm(db *gorm.DB) relation.ConversationModelInterface { //
return &ConversationGorm{NewMetaDB(db, &relation.ConversationModel{})} //func NewConversationGorm(db *gorm.DB) relation.ConversationModelInterface {
} // return &ConversationGorm{NewMetaDB(db, &relation.ConversationModel{})}
//}
func (c *ConversationGorm) NewTx(tx any) relation.ConversationModelInterface { //
return &ConversationGorm{NewMetaDB(tx.(*gorm.DB), &relation.ConversationModel{})} //func (c *ConversationGorm) NewTx(tx any) relation.ConversationModelInterface {
} // return &ConversationGorm{NewMetaDB(tx.(*gorm.DB), &relation.ConversationModel{})}
//}
func (c *ConversationGorm) Create(ctx context.Context, conversations []*relation.ConversationModel) (err error) { //
return utils.Wrap(c.db(ctx).Create(&conversations).Error, "") //func (c *ConversationGorm) Create(ctx context.Context, conversations []*relation.ConversationModel) (err error) {
} // return utils.Wrap(c.db(ctx).Create(&conversations).Error, "")
//}
func (c *ConversationGorm) Delete(ctx context.Context, groupIDs []string) (err error) { //
return utils.Wrap(c.db(ctx).Where("group_id in (?)", groupIDs).Delete(&relation.ConversationModel{}).Error, "") //func (c *ConversationGorm) Delete(ctx context.Context, groupIDs []string) (err error) {
} // return utils.Wrap(c.db(ctx).Where("group_id in (?)", groupIDs).Delete(&relation.ConversationModel{}).Error, "")
//}
func (c *ConversationGorm) UpdateByMap( //
ctx context.Context, //func (c *ConversationGorm) UpdateByMap(
userIDList []string, // ctx context.Context,
conversationID string, // userIDList []string,
args map[string]any, // conversationID string,
) (rows int64, err error) { // args map[string]any,
result := c.db(ctx).Where("owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Updates(args) //) (rows int64, err error) {
return result.RowsAffected, utils.Wrap(result.Error, "") // result := c.db(ctx).Where("owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Updates(args)
} // return result.RowsAffected, utils.Wrap(result.Error, "")
//}
func (c *ConversationGorm) Update(ctx context.Context, conversation *relation.ConversationModel) (err error) { //
return utils.Wrap( //func (c *ConversationGorm) Update(ctx context.Context, conversation *relation.ConversationModel) (err error) {
c.db(ctx). // return utils.Wrap(
Where("owner_user_id = ? and conversation_id = ?", conversation.OwnerUserID, conversation.ConversationID). // c.db(ctx).
Updates(conversation). // Where("owner_user_id = ? and conversation_id = ?", conversation.OwnerUserID, conversation.ConversationID).
Error, // Updates(conversation).
"", // Error,
) // "",
} // )
//}
func (c *ConversationGorm) Find( //
ctx context.Context, //func (c *ConversationGorm) Find(
ownerUserID string, // ctx context.Context,
conversationIDs []string, // ownerUserID string,
) (conversations []*relation.ConversationModel, err error) { // conversationIDs []string,
err = utils.Wrap( //) (conversations []*relation.ConversationModel, err error) {
c.db(ctx). // err = utils.Wrap(
Where("owner_user_id=? and conversation_id IN (?)", ownerUserID, conversationIDs). // c.db(ctx).
Find(&conversations). // Where("owner_user_id=? and conversation_id IN (?)", ownerUserID, conversationIDs).
Error, // Find(&conversations).
"", // Error,
) // "",
return conversations, err // )
} // return conversations, err
//}
func (c *ConversationGorm) Take( //
ctx context.Context, //func (c *ConversationGorm) Take(
userID, conversationID string, // ctx context.Context,
) (conversation *relation.ConversationModel, err error) { // userID, conversationID string,
cc := &relation.ConversationModel{} //) (conversation *relation.ConversationModel, err error) {
return cc, utils.Wrap( // cc := &relation.ConversationModel{}
c.db(ctx).Where("conversation_id = ? And owner_user_id = ?", conversationID, userID).Take(cc).Error, // return cc, utils.Wrap(
"", // c.db(ctx).Where("conversation_id = ? And owner_user_id = ?", conversationID, userID).Take(cc).Error,
) // "",
} // )
//}
func (c *ConversationGorm) FindUserID( //
ctx context.Context, //func (c *ConversationGorm) FindUserID(
userIDs []string, // ctx context.Context,
conversationIDs []string, // userIDs []string,
) (existUserID []string, err error) { // conversationIDs []string,
return existUserID, utils.Wrap( //) (existUserID []string, err error) {
c.db(ctx). // return existUserID, utils.Wrap(
Where(" owner_user_id IN (?) and conversation_id in (?)", userIDs, conversationIDs). // c.db(ctx).
Pluck("owner_user_id", &existUserID). // Where(" owner_user_id IN (?) and conversation_id in (?)", userIDs, conversationIDs).
Error, // Pluck("owner_user_id", &existUserID).
"", // Error,
) // "",
} // )
//}
func (c *ConversationGorm) FindConversationID( //
ctx context.Context, //func (c *ConversationGorm) FindConversationID(
userID string, // ctx context.Context,
conversationIDList []string, // userID string,
) (existConversationID []string, err error) { // conversationIDList []string,
return existConversationID, utils.Wrap( //) (existConversationID []string, err error) {
c.db(ctx). // return existConversationID, utils.Wrap(
Where(" conversation_id IN (?) and owner_user_id=?", conversationIDList, userID). // c.db(ctx).
Pluck("conversation_id", &existConversationID). // Where(" conversation_id IN (?) and owner_user_id=?", conversationIDList, userID).
Error, // Pluck("conversation_id", &existConversationID).
"", // Error,
) // "",
} // )
//}
func (c *ConversationGorm) FindUserIDAllConversationID( //
ctx context.Context, //func (c *ConversationGorm) FindUserIDAllConversationID(
userID string, // ctx context.Context,
) (conversationIDList []string, err error) { // userID string,
return conversationIDList, utils.Wrap( //) (conversationIDList []string, err error) {
c.db(ctx).Where("owner_user_id=?", userID).Pluck("conversation_id", &conversationIDList).Error, // return conversationIDList, utils.Wrap(
"", // c.db(ctx).Where("owner_user_id=?", userID).Pluck("conversation_id", &conversationIDList).Error,
) // "",
} // )
//}
func (c *ConversationGorm) FindUserIDAllConversations( //
ctx context.Context, //func (c *ConversationGorm) FindUserIDAllConversations(
userID string, // ctx context.Context,
) (conversations []*relation.ConversationModel, err error) { // userID string,
return conversations, utils.Wrap(c.db(ctx).Where("owner_user_id=?", userID).Find(&conversations).Error, "") //) (conversations []*relation.ConversationModel, err error) {
} // return conversations, utils.Wrap(c.db(ctx).Where("owner_user_id=?", userID).Find(&conversations).Error, "")
//}
func (c *ConversationGorm) FindRecvMsgNotNotifyUserIDs( //
ctx context.Context, //func (c *ConversationGorm) FindRecvMsgNotNotifyUserIDs(
groupID string, // ctx context.Context,
) (userIDs []string, err error) { // groupID string,
return userIDs, utils.Wrap( //) (userIDs []string, err error) {
c.db(ctx). // return userIDs, utils.Wrap(
Where("group_id = ? and recv_msg_opt = ?", groupID, constant.ReceiveNotNotifyMessage). // c.db(ctx).
Pluck("owner_user_id", &userIDs). // Where("group_id = ? and recv_msg_opt = ?", groupID, constant.ReceiveNotNotifyMessage).
Error, // Pluck("owner_user_id", &userIDs).
"", // Error,
) // "",
} // )
//}
func (c *ConversationGorm) FindSuperGroupRecvMsgNotNotifyUserIDs( //
ctx context.Context, //func (c *ConversationGorm) FindSuperGroupRecvMsgNotNotifyUserIDs(
groupID string, // ctx context.Context,
) (userIDs []string, err error) { // groupID string,
return userIDs, utils.Wrap( //) (userIDs []string, err error) {
c.db(ctx). // return userIDs, utils.Wrap(
Where("group_id = ? and recv_msg_opt = ? and conversation_type = ?", groupID, constant.ReceiveNotNotifyMessage, constant.SuperGroupChatType). // c.db(ctx).
Pluck("owner_user_id", &userIDs). // Where("group_id = ? and recv_msg_opt = ? and conversation_type = ?", groupID, constant.ReceiveNotNotifyMessage, constant.SuperGroupChatType).
Error, // Pluck("owner_user_id", &userIDs).
"", // Error,
) // "",
} // )
//}
func (c *ConversationGorm) GetUserRecvMsgOpt( //
ctx context.Context, //func (c *ConversationGorm) GetUserRecvMsgOpt(
ownerUserID, conversationID string, // ctx context.Context,
) (opt int, err error) { // ownerUserID, conversationID string,
var conversation relation.ConversationModel //) (opt int, err error) {
return int( // var conversation relation.ConversationModel
conversation.RecvMsgOpt, // return int(
), utils.Wrap( // conversation.RecvMsgOpt,
c.db(ctx). // ), utils.Wrap(
Where("conversation_id = ? And owner_user_id = ?", conversationID, ownerUserID). // c.db(ctx).
Select("recv_msg_opt"). // Where("conversation_id = ? And owner_user_id = ?", conversationID, ownerUserID).
Find(&conversation). // Select("recv_msg_opt").
Error, // Find(&conversation).
"", // Error,
) // "",
} // )
//}
func (c *ConversationGorm) GetAllConversationIDs(ctx context.Context) (conversationIDs []string, err error) { //
return conversationIDs, utils.Wrap( //func (c *ConversationGorm) GetAllConversationIDs(ctx context.Context) (conversationIDs []string, err error) {
c.db(ctx).Distinct("conversation_id").Pluck("conversation_id", &conversationIDs).Error, // return conversationIDs, utils.Wrap(
"", // c.db(ctx).Distinct("conversation_id").Pluck("conversation_id", &conversationIDs).Error,
) // "",
} // )
//}
func (c *ConversationGorm) GetAllConversationIDsNumber(ctx context.Context) (int64, error) { //
var num int64 //func (c *ConversationGorm) GetAllConversationIDsNumber(ctx context.Context) (int64, error) {
err := c.db(ctx).Select("COUNT(DISTINCT conversation_id)").Model(&relation.ConversationModel{}).Count(&num).Error // var num int64
return num, errs.Wrap(err) // err := c.db(ctx).Select("COUNT(DISTINCT conversation_id)").Model(&relation.ConversationModel{}).Count(&num).Error
} // return num, errs.Wrap(err)
//}
func (c *ConversationGorm) PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) (conversationIDs []string, err error) { //
err = c.db(ctx).Distinct("conversation_id").Limit(int(showNumber)).Offset(int((pageNumber-1)*showNumber)).Pluck("conversation_id", &conversationIDs).Error //func (c *ConversationGorm) PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) (conversationIDs []string, err error) {
err = errs.Wrap(err) // err = c.db(ctx).Distinct("conversation_id").Limit(int(showNumber)).Offset(int((pageNumber-1)*showNumber)).Pluck("conversation_id", &conversationIDs).Error
return // err = errs.Wrap(err)
} // return
//}
func (c *ConversationGorm) GetUserAllHasReadSeqs( //
ctx context.Context, //func (c *ConversationGorm) GetConversationsByConversationID(
ownerUserID string, // ctx context.Context,
) (hasReadSeqs map[string]int64, err error) { // conversationIDs []string,
return nil, nil //) (conversations []*relation.ConversationModel, err error) {
} // return conversations, utils.Wrap(
// c.db(ctx).Where("conversation_id IN (?)", conversationIDs).Find(&conversations).Error,
func (c *ConversationGorm) GetConversationsByConversationID( // "",
ctx context.Context, // )
conversationIDs []string, //}
) (conversations []*relation.ConversationModel, err error) { //
return conversations, utils.Wrap( //func (c *ConversationGorm) GetConversationIDsNeedDestruct(
c.db(ctx).Where("conversation_id IN (?)", conversationIDs).Find(&conversations).Error, // ctx context.Context,
"", //) (conversations []*relation.ConversationModel, err error) {
) // return conversations, utils.Wrap(
} // c.db(ctx).
// Where("is_msg_destruct = 1 && msg_destruct_time != 0 && (UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) || latest_msg_destruct_time is NULL)").
func (c *ConversationGorm) GetConversationIDsNeedDestruct( // Find(&conversations).
ctx context.Context, // Error,
) (conversations []*relation.ConversationModel, err error) { // "",
return conversations, utils.Wrap( // )
c.db(ctx). //}
Where("is_msg_destruct = 1 && msg_destruct_time != 0 && (UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) || latest_msg_destruct_time is NULL)"). //
Find(&conversations). //func (c *ConversationGorm) GetConversationRecvMsgOpt(ctx context.Context, userID string, conversationID string) (int32, error) {
Error, // var recvMsgOpt int32
"", // return recvMsgOpt, errs.Wrap(
) // c.db(ctx).
} // Model(&relation.ConversationModel{}).
// Where("conversation_id = ? and owner_user_id in ?", conversationID, userID).
func (c *ConversationGorm) GetConversationRecvMsgOpt(ctx context.Context, userID string, conversationID string) (int32, error) { // Pluck("recv_msg_opt", &recvMsgOpt).
var recvMsgOpt int32 // Error,
return recvMsgOpt, errs.Wrap( // )
c.db(ctx). //}
Model(&relation.ConversationModel{}). //
Where("conversation_id = ? and owner_user_id in ?", conversationID, userID). //func (c *ConversationGorm) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) {
Pluck("recv_msg_opt", &recvMsgOpt). // var userIDs []string
Error, // return userIDs, errs.Wrap(
) // c.db(ctx).
} // Model(&relation.ConversationModel{}).
// Where("conversation_id = ? and recv_msg_opt <> ?", conversationID, constant.ReceiveMessage).
func (c *ConversationGorm) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { // Pluck("owner_user_id", &userIDs).Error,
var userIDs []string // )
return userIDs, errs.Wrap( //}
c.db(ctx).
Model(&relation.ConversationModel{}).
Where("conversation_id = ? and recv_msg_opt <> ?", conversationID, constant.ReceiveMessage).
Pluck("owner_user_id", &userIDs).Error,
)
}

@ -16,6 +16,7 @@ package relation
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/pagination"
"time" "time"
) )
@ -23,25 +24,46 @@ const (
conversationModelTableName = "conversations" conversationModelTableName = "conversations"
) )
//type ConversationModel struct {
// OwnerUserID string `gorm:"column:owner_user_id;primary_key;type:char(128)" json:"OwnerUserID"`
// ConversationID string `gorm:"column:conversation_id;primary_key;type:char(128)" json:"conversationID"`
// ConversationType int32 `gorm:"column:conversation_type" json:"conversationType"`
// UserID string `gorm:"column:user_id;type:char(64)" json:"userID"`
// GroupID string `gorm:"column:group_id;type:char(128)" json:"groupID"`
// RecvMsgOpt int32 `gorm:"column:recv_msg_opt" json:"recvMsgOpt"`
// IsPinned bool `gorm:"column:is_pinned" json:"isPinned"`
// IsPrivateChat bool `gorm:"column:is_private_chat" json:"isPrivateChat"`
// BurnDuration int32 `gorm:"column:burn_duration;default:30" json:"burnDuration"`
// GroupAtType int32 `gorm:"column:group_at_type" json:"groupAtType"`
// AttachedInfo string `gorm:"column:attached_info;type:varchar(1024)" json:"attachedInfo"`
// Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"`
// MaxSeq int64 `gorm:"column:max_seq" json:"maxSeq"`
// MinSeq int64 `gorm:"column:min_seq" json:"minSeq"`
// CreateTime time.Time `gorm:"column:create_time;index:create_time;autoCreateTime"`
// IsMsgDestruct bool `gorm:"column:is_msg_destruct;default:false"`
// MsgDestructTime int64 `gorm:"column:msg_destruct_time;default:604800"`
// LatestMsgDestructTime time.Time `gorm:"column:latest_msg_destruct_time;autoCreateTime"`
//}
type ConversationModel struct { type ConversationModel struct {
OwnerUserID string `gorm:"column:owner_user_id;primary_key;type:char(128)" json:"OwnerUserID"` OwnerUserID string `bson:"owner_user_id"`
ConversationID string `gorm:"column:conversation_id;primary_key;type:char(128)" json:"conversationID"` ConversationID string `bson:"conversation_id"`
ConversationType int32 `gorm:"column:conversation_type" json:"conversationType"` ConversationType int32 `bson:"conversation_type"`
UserID string `gorm:"column:user_id;type:char(64)" json:"userID"` UserID string `bson:"user_id"`
GroupID string `gorm:"column:group_id;type:char(128)" json:"groupID"` GroupID string `bson:"group_id"`
RecvMsgOpt int32 `gorm:"column:recv_msg_opt" json:"recvMsgOpt"` RecvMsgOpt int32 `bson:"recv_msg_opt"`
IsPinned bool `gorm:"column:is_pinned" json:"isPinned"` IsPinned bool `bson:"is_pinned"`
IsPrivateChat bool `gorm:"column:is_private_chat" json:"isPrivateChat"` IsPrivateChat bool `bson:"is_private_chat"`
BurnDuration int32 `gorm:"column:burn_duration;default:30" json:"burnDuration"` BurnDuration int32 `bson:"burn_duration"`
GroupAtType int32 `gorm:"column:group_at_type" json:"groupAtType"` GroupAtType int32 `bson:"group_at_type"`
AttachedInfo string `gorm:"column:attached_info;type:varchar(1024)" json:"attachedInfo"` AttachedInfo string `bson:"attached_info"`
Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"` Ex string `bson:"ex"`
MaxSeq int64 `gorm:"column:max_seq" json:"maxSeq"` MaxSeq int64 `bson:"max_seq"`
MinSeq int64 `gorm:"column:min_seq" json:"minSeq"` MinSeq int64 `bson:"min_seq"`
CreateTime time.Time `gorm:"column:create_time;index:create_time;autoCreateTime"` CreateTime time.Time `bson:"create_time"`
IsMsgDestruct bool `gorm:"column:is_msg_destruct;default:false"` IsMsgDestruct bool `bson:"is_msg_destruct"`
MsgDestructTime int64 `gorm:"column:msg_destruct_time;default:604800"` MsgDestructTime int64 `bson:"msg_destruct_time"`
LatestMsgDestructTime time.Time `gorm:"column:latest_msg_destruct_time;autoCreateTime"` LatestMsgDestructTime time.Time `bson:"latest_msg_destruct_time"`
} }
func (ConversationModel) TableName() string { func (ConversationModel) TableName() string {
@ -61,11 +83,11 @@ type ConversationModelInterface interface {
FindUserIDAllConversations(ctx context.Context, userID string) (conversations []*ConversationModel, err error) FindUserIDAllConversations(ctx context.Context, userID string) (conversations []*ConversationModel, err error)
FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error)
FindSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) //FindSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
GetAllConversationIDs(ctx context.Context) ([]string, error) GetAllConversationIDs(ctx context.Context) ([]string, error)
GetAllConversationIDsNumber(ctx context.Context) (int64, error) GetAllConversationIDsNumber(ctx context.Context) (int64, error)
PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) (conversationIDs []string, err error) PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error)
GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hashReadSeqs map[string]int64, err error) //GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hashReadSeqs map[string]int64, err error)
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*ConversationModel, error) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*ConversationModel, error)
GetConversationIDsNeedDestruct(ctx context.Context) ([]*ConversationModel, error) GetConversationIDsNeedDestruct(ctx context.Context) ([]*ConversationModel, error)
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)

Loading…
Cancel
Save