diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index f117847aa..0ae9ed9a9 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -16,6 +16,9 @@ package conversation import ( "context" + "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/newmgo" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "google.golang.org/grpc" @@ -54,7 +57,14 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e if err != nil { return err } - conversationDB := relation.NewConversationGorm(db) + mongo, err := unrelation.NewMongo() + if err != nil { + return err + } + conversationDB, err := newmgo.NewConversationMongo(mongo.GetDatabase()) + if err != nil { + return err + } groupRpcClient := rpcclient.NewGroupRpcClient(client) msgRpcClient := rpcclient.NewMessageRpcClient(client) pbconversation.RegisterConversationServer(server, &conversationServer{ @@ -229,11 +239,12 @@ func (c *conversationServer) SetConversations(ctx context.Context, // 获取超级大群开启免打扰的用户ID. func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req *pbconversation.GetRecvMsgNotNotifyUserIDsReq) (*pbconversation.GetRecvMsgNotNotifyUserIDsResp, error) { - userIDs, err := c.conversationDatabase.FindRecvMsgNotNotifyUserIDs(ctx, req.GroupID) - if err != nil { - return nil, err - } - return &pbconversation.GetRecvMsgNotNotifyUserIDsResp{UserIDs: userIDs}, nil + //userIDs, err := c.conversationDatabase.FindRecvMsgNotNotifyUserIDs(ctx, req.GroupID) + //if err != nil { + // return nil, err + //} + //return &pbconversation.GetRecvMsgNotNotifyUserIDsResp{UserIDs: userIDs}, nil + return nil, errors.New("deprecated") } // create conversation without notification for msg redis transfer. diff --git a/internal/tools/conversation.go b/internal/tools/conversation.go index 1dbb900b1..c91e92078 100644 --- a/internal/tools/conversation.go +++ b/internal/tools/conversation.go @@ -16,6 +16,7 @@ package tools import ( "context" + "github.com/OpenIMSDK/protocol/sdkws" "math/rand" "time" @@ -91,7 +92,11 @@ func (c *MsgTool) ConversationsDestructMsgs() { } for i := 0; i < count; i++ { pageNumber := rand.Int63() % maxPage - conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, int32(pageNumber), batchNum) + pagination := &sdkws.RequestPagination{ + PageNumber: int32(pageNumber), + ShowNumber: batchNum, + } + conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination) if err != nil { log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) continue diff --git a/internal/tools/msg.go b/internal/tools/msg.go index cb375c225..63dc16f21 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -17,6 +17,7 @@ package tools import ( "context" "fmt" + "github.com/OpenIMSDK/protocol/sdkws" "github.com/openimsdk/open-im-server/v3/pkg/common/db/newmgo" "math" @@ -106,10 +107,14 @@ func InitMsgTool() (*MsgTool, error) { if err != nil { return nil, err } + conversationDB, err := newmgo.NewConversationMongo(mongo.GetDatabase()) + if err != nil { + return nil, err + } groupDatabase := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, tx.NewMongo(mongo.GetClient()), nil) conversationDatabase := controller.NewConversationDatabase( - relation.NewConversationGorm(db), - cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), relation.NewConversationGorm(db)), + conversationDB, + cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), tx.NewGorm(db), ) msgRpcClient := rpcclient.NewMessageRpcClient(discov) @@ -156,7 +161,11 @@ func (c *MsgTool) AllConversationClearMsgAndFixSeq() { } for i := 0; i < count; i++ { pageNumber := rand.Int63() % maxPage - conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, int32(pageNumber), batchNum) + pagination := &sdkws.RequestPagination{ + PageNumber: int32(pageNumber), + ShowNumber: batchNum, + } + conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination) if err != nil { log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) continue diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index 9c0bcfae4..a7018bc18 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -26,7 +26,6 @@ import ( "github.com/OpenIMSDK/tools/utils" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/relation" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" ) @@ -67,10 +66,10 @@ type ConversationCache interface { GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache // get one super group recv msg but do not notification userID list - GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) + //GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) DelSuperGroupRecvMsgNotNotifyUserIDs(groupID string) ConversationCache // get one super group recv msg but do not notification userID list hash - GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) + //GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache //GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) @@ -101,20 +100,20 @@ type ConversationRedisCache struct { expireTime time.Duration } -func NewNewConversationRedis( - rdb redis.UniversalClient, - conversationDB *relation.ConversationGorm, - options rockscache.Options, -) ConversationCache { - rcClient := rockscache.NewClient(rdb, options) - - return &ConversationRedisCache{ - rcClient: rcClient, - metaCache: NewMetaCacheRedis(rcClient), - conversationDB: conversationDB, - expireTime: conversationExpireTime, - } -} +//func NewNewConversationRedis( +// rdb redis.UniversalClient, +// conversationDB *relation.ConversationGorm, +// options rockscache.Options, +//) ConversationCache { +// rcClient := rockscache.NewClient(rdb, options) +// +// return &ConversationRedisCache{ +// rcClient: rcClient, +// metaCache: NewMetaCacheRedis(rcClient), +// conversationDB: conversationDB, +// expireTime: conversationExpireTime, +// } +//} func (c *ConversationRedisCache) NewCache() ConversationCache { return &ConversationRedisCache{ @@ -282,11 +281,11 @@ func (c *ConversationRedisCache) GetUserRecvMsgOpt(ctx context.Context, ownerUse }) } -func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) { - return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsKey(groupID), c.expireTime, func(ctx context.Context) (userIDs []string, err error) { - return c.conversationDB.FindSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) - }) -} +//func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) { +// return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsKey(groupID), c.expireTime, func(ctx context.Context) (userIDs []string, err error) { +// return c.conversationDB.FindSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) +// }) +//} func (c *ConversationRedisCache) DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache { keys := make([]string, 0, len(ownerUserIDs)) @@ -313,19 +312,19 @@ func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDs(groupID st return cache } -func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) { - return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID), c.expireTime, func(ctx context.Context) (hash uint64, err error) { - userIDs, err := c.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) - if err != nil { - return 0, err - } - utils.Sort(userIDs, true) - bi := big.NewInt(0) - bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16) - return bi.Uint64(), nil - }, - ) -} +//func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) { +// return getCache(ctx, c.rcClient, c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID), c.expireTime, func(ctx context.Context) (hash uint64, err error) { +// userIDs, err := c.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) +// if err != nil { +// return 0, err +// } +// utils.Sort(userIDs, true) +// bi := big.NewInt(0) +// bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16) +// return bi.Uint64(), nil +// }, +// ) +//} func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache { cache := c.NewCache() diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index 670becf64..f905eb723 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -16,6 +16,7 @@ package controller import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/pagination" "time" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" @@ -39,7 +40,7 @@ type ConversationDatabase interface { // FindConversations 根据会话ID获取某个用户的多个会话 FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationtb.ConversationModel, error) // FindRecvMsgNotNotifyUserIDs 获取超级大群开启免打扰的用户ID - FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) + //FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) // GetUserAllConversation 获取一个用户在服务器上所有的会话 GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationtb.ConversationModel, error) // SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性 @@ -51,7 +52,7 @@ type ConversationDatabase interface { GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) GetAllConversationIDs(ctx context.Context) ([]string, error) GetAllConversationIDsNumber(ctx context.Context) (int64, error) - PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) (conversationIDs []string, err error) + PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error) //GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.ConversationModel, error) @@ -255,9 +256,9 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs return cache.ExecDel(ctx) } -func (c *conversationDatabase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) { - return c.cache.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) -} +//func (c *conversationDatabase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) { +// return c.cache.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) +//} func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error { cache := c.cache.NewCache() @@ -311,14 +312,10 @@ func (c *conversationDatabase) GetAllConversationIDsNumber(ctx context.Context) return c.conversationDB.GetAllConversationIDsNumber(ctx) } -func (c *conversationDatabase) PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) ([]string, error) { - return c.conversationDB.PageConversationIDs(ctx, pageNumber, showNumber) +func (c *conversationDatabase) PageConversationIDs(ctx context.Context, pagination pagination.Pagination) ([]string, error) { + return c.conversationDB.PageConversationIDs(ctx, pagination) } -//func (c *conversationDatabase) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) { -// return c.cache.GetUserAllHasReadSeqs(ctx, ownerUserID) -//} - func (c *conversationDatabase) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error) { return c.conversationDB.GetConversationsByConversationID(ctx, conversationIDs) } diff --git a/pkg/common/db/newmgo/conversation.go b/pkg/common/db/newmgo/conversation.go new file mode 100644 index 000000000..863b3ad6c --- /dev/null +++ b/pkg/common/db/newmgo/conversation.go @@ -0,0 +1,134 @@ +package newmgo + +import ( + "context" + "github.com/OpenIMSDK/protocol/constant" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/newmgo/mgotool" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" + "github.com/openimsdk/open-im-server/v3/pkg/common/pagination" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "time" +) + +func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) { + return &ConversationMgo{ + coll: db.Collection("conversation"), + }, nil +} + +type ConversationMgo struct { + coll *mongo.Collection +} + +func (c *ConversationMgo) Create(ctx context.Context, conversations []*relation.ConversationModel) (err error) { + return mgotool.InsertMany(ctx, c.coll, conversations) +} + +func (c *ConversationMgo) Delete(ctx context.Context, groupIDs []string) (err error) { + return mgotool.DeleteMany(ctx, c.coll, bson.M{"group_id": bson.M{"$in": groupIDs}}) +} + +func (c *ConversationMgo) UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error) { + res, err := mgotool.UpdateMany(ctx, c.coll, bson.M{"owner_user_id": bson.M{"$in": userIDs}, "conversation_id": conversationID}, bson.M{"$set": args}) + if err != nil { + return 0, err + } + return res.ModifiedCount, nil +} + +func (c *ConversationMgo) Update(ctx context.Context, conversation *relation.ConversationModel) (err error) { + return mgotool.UpdateOne(ctx, c.coll, bson.M{"owner_user_id": conversation.OwnerUserID, "conversation_id": conversation.ConversationID}, bson.M{"$set": conversation}, true) +} + +func (c *ConversationMgo) Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*relation.ConversationModel, err error) { + return mgotool.Find[*relation.ConversationModel](ctx, c.coll, bson.M{"owner_user_id": ownerUserID, "conversation_id": bson.M{"$in": conversationIDs}}) +} + +func (c *ConversationMgo) FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error) { + return mgotool.Find[string](ctx, c.coll, bson.M{"owner_user_id": bson.M{"$in": userIDs}, "conversation_id": bson.M{"$in": conversationIDs}}, options.Find().SetProjection(bson.M{"owner_user_id": 1})) +} + +func (c *ConversationMgo) FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error) { + return mgotool.Find[string](ctx, c.coll, bson.M{"owner_user_id": userID}, options.Find().SetProjection(bson.M{"conversation_id": 1})) +} + +func (c *ConversationMgo) Take(ctx context.Context, userID, conversationID string) (conversation *relation.ConversationModel, err error) { + return mgotool.FindOne[*relation.ConversationModel](ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": conversationID}) +} + +func (c *ConversationMgo) FindConversationID(ctx context.Context, userID string, conversationIDs []string) (existConversationID []string, err error) { + return mgotool.Find[string](ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": bson.M{"$in": conversationIDs}}, options.Find().SetProjection(bson.M{"conversation_id": 1})) +} + +func (c *ConversationMgo) FindUserIDAllConversations(ctx context.Context, userID string) (conversations []*relation.ConversationModel, err error) { + return mgotool.Find[*relation.ConversationModel](ctx, c.coll, bson.M{"owner_user_id": userID}) +} + +func (c *ConversationMgo) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) { + return mgotool.Find[string](ctx, c.coll, bson.M{"group_id": groupID, "recv_msg_opt": constant.ReceiveNotNotifyMessage}, options.Find().SetProjection(bson.M{"owner_user_id": 1})) +} + +func (c *ConversationMgo) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) { + return mgotool.FindOne[int](ctx, c.coll, bson.M{"owner_user_id": ownerUserID, "conversation_id": conversationID}, options.FindOne().SetProjection(bson.M{"recv_msg_opt": 1})) +} + +func (c *ConversationMgo) GetAllConversationIDs(ctx context.Context) ([]string, error) { + return mgotool.Aggregate[string](ctx, c.coll, []bson.M{ + {"$group": bson.M{"_id": "$conversation_id"}}, + {"$project": bson.M{"_id": 0, "conversation_id": "$_id"}}, + }) +} + +func (c *ConversationMgo) GetAllConversationIDsNumber(ctx context.Context) (int64, error) { + counts, err := mgotool.Aggregate[int64](ctx, c.coll, []bson.M{ + {"$group": bson.M{"_id": "$conversation_id"}}, + {"$project": bson.M{"_id": 0, "conversation_id": "$_id"}}, + }) + if err != nil { + return 0, err + } + if len(counts) == 0 { + return 0, nil + } + return counts[0], nil +} + +func (c *ConversationMgo) PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error) { + return mgotool.FindPageOnly[string](ctx, c.coll, bson.M{}, pagination, options.Find().SetProjection(bson.M{"conversation_id": 1})) +} + +func (c *ConversationMgo) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relation.ConversationModel, error) { + return mgotool.Find[*relation.ConversationModel](ctx, c.coll, bson.M{"conversation_id": bson.M{"$in": conversationIDs}}) +} + +func (c *ConversationMgo) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relation.ConversationModel, error) { + //"is_msg_destruct = 1 && msg_destruct_time != 0 && (UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) || latest_msg_destruct_time is NULL)" + return mgotool.Find[*relation.ConversationModel](ctx, c.coll, bson.M{ + "is_msg_destruct": 1, + "msg_destruct_time": bson.M{"$ne": 0}, + "$or": []bson.M{ + { + "$expr": bson.M{ + "$gt": []any{ + time.Now(), + bson.M{"$add": []any{"$msg_destruct_time", "$latest_msg_destruct_time"}}, + }, + }, + }, + { + "latest_msg_destruct_time": nil, + }, + }, + }) +} + +func (c *ConversationMgo) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { + return mgotool.Find[string](ctx, c.coll, bson.M{"conversation_id": conversationID, "recv_msg_opt": bson.M{"$ne": constant.ReceiveMessage}}, options.Find().SetProjection(bson.M{"owner_user_id": 1})) +} + +func (c *ConversationMgo) NewTx(tx any) relation.ConversationModelInterface { + //TODO implement me + panic("implement me") +} diff --git a/pkg/common/db/newmgo/mgotool/tool.go b/pkg/common/db/newmgo/mgotool/tool.go index 74f0a3517..01177edba 100644 --- a/pkg/common/db/newmgo/mgotool/tool.go +++ b/pkg/common/db/newmgo/mgotool/tool.go @@ -48,6 +48,14 @@ func UpdateOne(ctx context.Context, coll *mongo.Collection, filter any, update a return nil } +func UpdateMany(ctx context.Context, coll *mongo.Collection, filter any, update any, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) { + res, err := coll.UpdateMany(ctx, filter, update, opts...) + if err != nil { + return nil, errs.Wrap(err) + } + return res, nil +} + func Find[T any](ctx context.Context, coll *mongo.Collection, filter any, opts ...*options.FindOptions) ([]T, error) { cur, err := coll.Find(ctx, filter, opts...) if err != nil { @@ -92,6 +100,15 @@ func FindPage[T any](ctx context.Context, coll *mongo.Collection, filter any, pa return count, res, nil } +func FindPageOnly[T any](ctx context.Context, coll *mongo.Collection, filter any, pagination pagination.Pagination, opts ...*options.FindOptions) ([]T, error) { + skip := int64(pagination.GetPageNumber()-1) * int64(pagination.GetShowNumber()) + if skip < 0 || pagination.GetShowNumber() <= 0 { + return nil, nil + } + opt := options.Find().SetSkip(skip).SetLimit(int64(pagination.GetShowNumber())) + return Find[T](ctx, coll, filter, append(opts, opt)...) +} + func Count(ctx context.Context, coll *mongo.Collection, filter any, opts ...*options.CountOptions) (int64, error) { return coll.CountDocuments(ctx, filter, opts...) } @@ -119,6 +136,14 @@ func DeleteMany(ctx context.Context, coll *mongo.Collection, filter any, opts .. return nil } -//func Upsert[T any](ctx context.Context, coll *mongo.Collection, val *T, opts ...*options.InsertManyOptions) error { -// return nil -//} +func Aggregate[T any](ctx context.Context, coll *mongo.Collection, pipeline any, opts ...*options.AggregateOptions) ([]T, error) { + cur, err := coll.Aggregate(ctx, pipeline, opts...) + if err != nil { + return nil, err + } + var ts []T + if err := cur.All(ctx, &ts); err != nil { + return nil, err + } + return ts, nil +} diff --git a/pkg/common/db/newmgo/user_mgo.go b/pkg/common/db/newmgo/user.go similarity index 100% rename from pkg/common/db/newmgo/user_mgo.go rename to pkg/common/db/newmgo/user.go diff --git a/pkg/common/db/relation/conversation_model.go b/pkg/common/db/relation/conversation_model.go index 329c35f94..73cf1a80e 100644 --- a/pkg/common/db/relation/conversation_model.go +++ b/pkg/common/db/relation/conversation_model.go @@ -14,237 +14,231 @@ package relation -import ( - "context" - - "github.com/OpenIMSDK/tools/errs" - "gorm.io/gorm" - - "github.com/OpenIMSDK/protocol/constant" - "github.com/OpenIMSDK/tools/utils" - - "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" -) - -type ConversationGorm struct { - *MetaDB -} - -func NewConversationGorm(db *gorm.DB) relation.ConversationModelInterface { - return &ConversationGorm{NewMetaDB(db, &relation.ConversationModel{})} -} - -func (c *ConversationGorm) NewTx(tx any) relation.ConversationModelInterface { - return &ConversationGorm{NewMetaDB(tx.(*gorm.DB), &relation.ConversationModel{})} -} - -func (c *ConversationGorm) Create(ctx context.Context, conversations []*relation.ConversationModel) (err error) { - return utils.Wrap(c.db(ctx).Create(&conversations).Error, "") -} - -func (c *ConversationGorm) Delete(ctx context.Context, groupIDs []string) (err error) { - return utils.Wrap(c.db(ctx).Where("group_id in (?)", groupIDs).Delete(&relation.ConversationModel{}).Error, "") -} - -func (c *ConversationGorm) UpdateByMap( - ctx context.Context, - userIDList []string, - conversationID string, - args map[string]any, -) (rows int64, err error) { - result := c.db(ctx).Where("owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Updates(args) - return result.RowsAffected, utils.Wrap(result.Error, "") -} - -func (c *ConversationGorm) Update(ctx context.Context, conversation *relation.ConversationModel) (err error) { - return utils.Wrap( - c.db(ctx). - Where("owner_user_id = ? and conversation_id = ?", conversation.OwnerUserID, conversation.ConversationID). - Updates(conversation). - Error, - "", - ) -} - -func (c *ConversationGorm) Find( - ctx context.Context, - ownerUserID string, - conversationIDs []string, -) (conversations []*relation.ConversationModel, err error) { - err = utils.Wrap( - c.db(ctx). - Where("owner_user_id=? and conversation_id IN (?)", ownerUserID, conversationIDs). - Find(&conversations). - Error, - "", - ) - return conversations, err -} - -func (c *ConversationGorm) Take( - ctx context.Context, - userID, conversationID string, -) (conversation *relation.ConversationModel, err error) { - cc := &relation.ConversationModel{} - return cc, utils.Wrap( - c.db(ctx).Where("conversation_id = ? And owner_user_id = ?", conversationID, userID).Take(cc).Error, - "", - ) -} - -func (c *ConversationGorm) FindUserID( - ctx context.Context, - userIDs []string, - conversationIDs []string, -) (existUserID []string, err error) { - return existUserID, utils.Wrap( - c.db(ctx). - Where(" owner_user_id IN (?) and conversation_id in (?)", userIDs, conversationIDs). - Pluck("owner_user_id", &existUserID). - Error, - "", - ) -} - -func (c *ConversationGorm) FindConversationID( - ctx context.Context, - userID string, - conversationIDList []string, -) (existConversationID []string, err error) { - return existConversationID, utils.Wrap( - c.db(ctx). - Where(" conversation_id IN (?) and owner_user_id=?", conversationIDList, userID). - Pluck("conversation_id", &existConversationID). - Error, - "", - ) -} - -func (c *ConversationGorm) FindUserIDAllConversationID( - ctx context.Context, - userID string, -) (conversationIDList []string, err error) { - return conversationIDList, utils.Wrap( - c.db(ctx).Where("owner_user_id=?", userID).Pluck("conversation_id", &conversationIDList).Error, - "", - ) -} - -func (c *ConversationGorm) FindUserIDAllConversations( - ctx context.Context, - userID string, -) (conversations []*relation.ConversationModel, err error) { - return conversations, utils.Wrap(c.db(ctx).Where("owner_user_id=?", userID).Find(&conversations).Error, "") -} - -func (c *ConversationGorm) FindRecvMsgNotNotifyUserIDs( - ctx context.Context, - groupID string, -) (userIDs []string, err error) { - return userIDs, utils.Wrap( - c.db(ctx). - Where("group_id = ? and recv_msg_opt = ?", groupID, constant.ReceiveNotNotifyMessage). - Pluck("owner_user_id", &userIDs). - Error, - "", - ) -} - -func (c *ConversationGorm) FindSuperGroupRecvMsgNotNotifyUserIDs( - ctx context.Context, - groupID string, -) (userIDs []string, err error) { - return userIDs, utils.Wrap( - c.db(ctx). - Where("group_id = ? and recv_msg_opt = ? and conversation_type = ?", groupID, constant.ReceiveNotNotifyMessage, constant.SuperGroupChatType). - Pluck("owner_user_id", &userIDs). - Error, - "", - ) -} - -func (c *ConversationGorm) GetUserRecvMsgOpt( - ctx context.Context, - ownerUserID, conversationID string, -) (opt int, err error) { - var conversation relation.ConversationModel - return int( - conversation.RecvMsgOpt, - ), utils.Wrap( - c.db(ctx). - Where("conversation_id = ? And owner_user_id = ?", conversationID, ownerUserID). - Select("recv_msg_opt"). - Find(&conversation). - Error, - "", - ) -} - -func (c *ConversationGorm) GetAllConversationIDs(ctx context.Context) (conversationIDs []string, err error) { - return conversationIDs, utils.Wrap( - c.db(ctx).Distinct("conversation_id").Pluck("conversation_id", &conversationIDs).Error, - "", - ) -} - -func (c *ConversationGorm) GetAllConversationIDsNumber(ctx context.Context) (int64, error) { - var num int64 - err := c.db(ctx).Select("COUNT(DISTINCT conversation_id)").Model(&relation.ConversationModel{}).Count(&num).Error - return num, errs.Wrap(err) -} - -func (c *ConversationGorm) PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) (conversationIDs []string, err error) { - err = c.db(ctx).Distinct("conversation_id").Limit(int(showNumber)).Offset(int((pageNumber-1)*showNumber)).Pluck("conversation_id", &conversationIDs).Error - err = errs.Wrap(err) - return -} - -func (c *ConversationGorm) GetUserAllHasReadSeqs( - ctx context.Context, - ownerUserID string, -) (hasReadSeqs map[string]int64, err error) { - return nil, nil -} - -func (c *ConversationGorm) GetConversationsByConversationID( - ctx context.Context, - conversationIDs []string, -) (conversations []*relation.ConversationModel, err error) { - return conversations, utils.Wrap( - c.db(ctx).Where("conversation_id IN (?)", conversationIDs).Find(&conversations).Error, - "", - ) -} - -func (c *ConversationGorm) GetConversationIDsNeedDestruct( - ctx context.Context, -) (conversations []*relation.ConversationModel, err error) { - return conversations, utils.Wrap( - c.db(ctx). - Where("is_msg_destruct = 1 && msg_destruct_time != 0 && (UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) || latest_msg_destruct_time is NULL)"). - Find(&conversations). - Error, - "", - ) -} - -func (c *ConversationGorm) GetConversationRecvMsgOpt(ctx context.Context, userID string, conversationID string) (int32, error) { - var recvMsgOpt int32 - return recvMsgOpt, errs.Wrap( - c.db(ctx). - Model(&relation.ConversationModel{}). - Where("conversation_id = ? and owner_user_id in ?", conversationID, userID). - Pluck("recv_msg_opt", &recvMsgOpt). - Error, - ) -} - -func (c *ConversationGorm) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { - var userIDs []string - return userIDs, errs.Wrap( - c.db(ctx). - Model(&relation.ConversationModel{}). - Where("conversation_id = ? and recv_msg_opt <> ?", conversationID, constant.ReceiveMessage). - Pluck("owner_user_id", &userIDs).Error, - ) -} +// +//import ( +// "context" +// +// "github.com/OpenIMSDK/tools/errs" +// "gorm.io/gorm" +// +// "github.com/OpenIMSDK/protocol/constant" +// "github.com/OpenIMSDK/tools/utils" +// +// "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" +//) +// +//type ConversationGorm struct { +// *MetaDB +//} +// +//func NewConversationGorm(db *gorm.DB) relation.ConversationModelInterface { +// return &ConversationGorm{NewMetaDB(db, &relation.ConversationModel{})} +//} +// +//func (c *ConversationGorm) NewTx(tx any) relation.ConversationModelInterface { +// return &ConversationGorm{NewMetaDB(tx.(*gorm.DB), &relation.ConversationModel{})} +//} +// +//func (c *ConversationGorm) Create(ctx context.Context, conversations []*relation.ConversationModel) (err error) { +// return utils.Wrap(c.db(ctx).Create(&conversations).Error, "") +//} +// +//func (c *ConversationGorm) Delete(ctx context.Context, groupIDs []string) (err error) { +// return utils.Wrap(c.db(ctx).Where("group_id in (?)", groupIDs).Delete(&relation.ConversationModel{}).Error, "") +//} +// +//func (c *ConversationGorm) UpdateByMap( +// ctx context.Context, +// userIDList []string, +// conversationID string, +// args map[string]any, +//) (rows int64, err error) { +// result := c.db(ctx).Where("owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Updates(args) +// return result.RowsAffected, utils.Wrap(result.Error, "") +//} +// +//func (c *ConversationGorm) Update(ctx context.Context, conversation *relation.ConversationModel) (err error) { +// return utils.Wrap( +// c.db(ctx). +// Where("owner_user_id = ? and conversation_id = ?", conversation.OwnerUserID, conversation.ConversationID). +// Updates(conversation). +// Error, +// "", +// ) +//} +// +//func (c *ConversationGorm) Find( +// ctx context.Context, +// ownerUserID string, +// conversationIDs []string, +//) (conversations []*relation.ConversationModel, err error) { +// err = utils.Wrap( +// c.db(ctx). +// Where("owner_user_id=? and conversation_id IN (?)", ownerUserID, conversationIDs). +// Find(&conversations). +// Error, +// "", +// ) +// return conversations, err +//} +// +//func (c *ConversationGorm) Take( +// ctx context.Context, +// userID, conversationID string, +//) (conversation *relation.ConversationModel, err error) { +// cc := &relation.ConversationModel{} +// return cc, utils.Wrap( +// c.db(ctx).Where("conversation_id = ? And owner_user_id = ?", conversationID, userID).Take(cc).Error, +// "", +// ) +//} +// +//func (c *ConversationGorm) FindUserID( +// ctx context.Context, +// userIDs []string, +// conversationIDs []string, +//) (existUserID []string, err error) { +// return existUserID, utils.Wrap( +// c.db(ctx). +// Where(" owner_user_id IN (?) and conversation_id in (?)", userIDs, conversationIDs). +// Pluck("owner_user_id", &existUserID). +// Error, +// "", +// ) +//} +// +//func (c *ConversationGorm) FindConversationID( +// ctx context.Context, +// userID string, +// conversationIDList []string, +//) (existConversationID []string, err error) { +// return existConversationID, utils.Wrap( +// c.db(ctx). +// Where(" conversation_id IN (?) and owner_user_id=?", conversationIDList, userID). +// Pluck("conversation_id", &existConversationID). +// Error, +// "", +// ) +//} +// +//func (c *ConversationGorm) FindUserIDAllConversationID( +// ctx context.Context, +// userID string, +//) (conversationIDList []string, err error) { +// return conversationIDList, utils.Wrap( +// c.db(ctx).Where("owner_user_id=?", userID).Pluck("conversation_id", &conversationIDList).Error, +// "", +// ) +//} +// +//func (c *ConversationGorm) FindUserIDAllConversations( +// ctx context.Context, +// userID string, +//) (conversations []*relation.ConversationModel, err error) { +// return conversations, utils.Wrap(c.db(ctx).Where("owner_user_id=?", userID).Find(&conversations).Error, "") +//} +// +//func (c *ConversationGorm) FindRecvMsgNotNotifyUserIDs( +// ctx context.Context, +// groupID string, +//) (userIDs []string, err error) { +// return userIDs, utils.Wrap( +// c.db(ctx). +// Where("group_id = ? and recv_msg_opt = ?", groupID, constant.ReceiveNotNotifyMessage). +// Pluck("owner_user_id", &userIDs). +// Error, +// "", +// ) +//} +// +//func (c *ConversationGorm) FindSuperGroupRecvMsgNotNotifyUserIDs( +// ctx context.Context, +// groupID string, +//) (userIDs []string, err error) { +// return userIDs, utils.Wrap( +// c.db(ctx). +// Where("group_id = ? and recv_msg_opt = ? and conversation_type = ?", groupID, constant.ReceiveNotNotifyMessage, constant.SuperGroupChatType). +// Pluck("owner_user_id", &userIDs). +// Error, +// "", +// ) +//} +// +//func (c *ConversationGorm) GetUserRecvMsgOpt( +// ctx context.Context, +// ownerUserID, conversationID string, +//) (opt int, err error) { +// var conversation relation.ConversationModel +// return int( +// conversation.RecvMsgOpt, +// ), utils.Wrap( +// c.db(ctx). +// Where("conversation_id = ? And owner_user_id = ?", conversationID, ownerUserID). +// Select("recv_msg_opt"). +// Find(&conversation). +// Error, +// "", +// ) +//} +// +//func (c *ConversationGorm) GetAllConversationIDs(ctx context.Context) (conversationIDs []string, err error) { +// return conversationIDs, utils.Wrap( +// c.db(ctx).Distinct("conversation_id").Pluck("conversation_id", &conversationIDs).Error, +// "", +// ) +//} +// +//func (c *ConversationGorm) GetAllConversationIDsNumber(ctx context.Context) (int64, error) { +// var num int64 +// err := c.db(ctx).Select("COUNT(DISTINCT conversation_id)").Model(&relation.ConversationModel{}).Count(&num).Error +// return num, errs.Wrap(err) +//} +// +//func (c *ConversationGorm) PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) (conversationIDs []string, err error) { +// err = c.db(ctx).Distinct("conversation_id").Limit(int(showNumber)).Offset(int((pageNumber-1)*showNumber)).Pluck("conversation_id", &conversationIDs).Error +// err = errs.Wrap(err) +// return +//} +// +//func (c *ConversationGorm) GetConversationsByConversationID( +// ctx context.Context, +// conversationIDs []string, +//) (conversations []*relation.ConversationModel, err error) { +// return conversations, utils.Wrap( +// c.db(ctx).Where("conversation_id IN (?)", conversationIDs).Find(&conversations).Error, +// "", +// ) +//} +// +//func (c *ConversationGorm) GetConversationIDsNeedDestruct( +// ctx context.Context, +//) (conversations []*relation.ConversationModel, err error) { +// return conversations, utils.Wrap( +// c.db(ctx). +// Where("is_msg_destruct = 1 && msg_destruct_time != 0 && (UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) || latest_msg_destruct_time is NULL)"). +// Find(&conversations). +// Error, +// "", +// ) +//} +// +//func (c *ConversationGorm) GetConversationRecvMsgOpt(ctx context.Context, userID string, conversationID string) (int32, error) { +// var recvMsgOpt int32 +// return recvMsgOpt, errs.Wrap( +// c.db(ctx). +// Model(&relation.ConversationModel{}). +// Where("conversation_id = ? and owner_user_id in ?", conversationID, userID). +// Pluck("recv_msg_opt", &recvMsgOpt). +// Error, +// ) +//} +// +//func (c *ConversationGorm) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { +// var userIDs []string +// return userIDs, errs.Wrap( +// c.db(ctx). +// Model(&relation.ConversationModel{}). +// Where("conversation_id = ? and recv_msg_opt <> ?", conversationID, constant.ReceiveMessage). +// Pluck("owner_user_id", &userIDs).Error, +// ) +//} diff --git a/pkg/common/db/table/relation/conversation.go b/pkg/common/db/table/relation/conversation.go index 07de85d59..e6e9e249b 100644 --- a/pkg/common/db/table/relation/conversation.go +++ b/pkg/common/db/table/relation/conversation.go @@ -16,6 +16,7 @@ package relation import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/pagination" "time" ) @@ -23,25 +24,46 @@ const ( conversationModelTableName = "conversations" ) +//type ConversationModel struct { +// OwnerUserID string `gorm:"column:owner_user_id;primary_key;type:char(128)" json:"OwnerUserID"` +// ConversationID string `gorm:"column:conversation_id;primary_key;type:char(128)" json:"conversationID"` +// ConversationType int32 `gorm:"column:conversation_type" json:"conversationType"` +// UserID string `gorm:"column:user_id;type:char(64)" json:"userID"` +// GroupID string `gorm:"column:group_id;type:char(128)" json:"groupID"` +// RecvMsgOpt int32 `gorm:"column:recv_msg_opt" json:"recvMsgOpt"` +// IsPinned bool `gorm:"column:is_pinned" json:"isPinned"` +// IsPrivateChat bool `gorm:"column:is_private_chat" json:"isPrivateChat"` +// BurnDuration int32 `gorm:"column:burn_duration;default:30" json:"burnDuration"` +// GroupAtType int32 `gorm:"column:group_at_type" json:"groupAtType"` +// AttachedInfo string `gorm:"column:attached_info;type:varchar(1024)" json:"attachedInfo"` +// Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"` +// MaxSeq int64 `gorm:"column:max_seq" json:"maxSeq"` +// MinSeq int64 `gorm:"column:min_seq" json:"minSeq"` +// CreateTime time.Time `gorm:"column:create_time;index:create_time;autoCreateTime"` +// IsMsgDestruct bool `gorm:"column:is_msg_destruct;default:false"` +// MsgDestructTime int64 `gorm:"column:msg_destruct_time;default:604800"` +// LatestMsgDestructTime time.Time `gorm:"column:latest_msg_destruct_time;autoCreateTime"` +//} + type ConversationModel struct { - OwnerUserID string `gorm:"column:owner_user_id;primary_key;type:char(128)" json:"OwnerUserID"` - ConversationID string `gorm:"column:conversation_id;primary_key;type:char(128)" json:"conversationID"` - ConversationType int32 `gorm:"column:conversation_type" json:"conversationType"` - UserID string `gorm:"column:user_id;type:char(64)" json:"userID"` - GroupID string `gorm:"column:group_id;type:char(128)" json:"groupID"` - RecvMsgOpt int32 `gorm:"column:recv_msg_opt" json:"recvMsgOpt"` - IsPinned bool `gorm:"column:is_pinned" json:"isPinned"` - IsPrivateChat bool `gorm:"column:is_private_chat" json:"isPrivateChat"` - BurnDuration int32 `gorm:"column:burn_duration;default:30" json:"burnDuration"` - GroupAtType int32 `gorm:"column:group_at_type" json:"groupAtType"` - AttachedInfo string `gorm:"column:attached_info;type:varchar(1024)" json:"attachedInfo"` - Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"` - MaxSeq int64 `gorm:"column:max_seq" json:"maxSeq"` - MinSeq int64 `gorm:"column:min_seq" json:"minSeq"` - CreateTime time.Time `gorm:"column:create_time;index:create_time;autoCreateTime"` - IsMsgDestruct bool `gorm:"column:is_msg_destruct;default:false"` - MsgDestructTime int64 `gorm:"column:msg_destruct_time;default:604800"` - LatestMsgDestructTime time.Time `gorm:"column:latest_msg_destruct_time;autoCreateTime"` + OwnerUserID string `bson:"owner_user_id"` + ConversationID string `bson:"conversation_id"` + ConversationType int32 `bson:"conversation_type"` + UserID string `bson:"user_id"` + GroupID string `bson:"group_id"` + RecvMsgOpt int32 `bson:"recv_msg_opt"` + IsPinned bool `bson:"is_pinned"` + IsPrivateChat bool `bson:"is_private_chat"` + BurnDuration int32 `bson:"burn_duration"` + GroupAtType int32 `bson:"group_at_type"` + AttachedInfo string `bson:"attached_info"` + Ex string `bson:"ex"` + MaxSeq int64 `bson:"max_seq"` + MinSeq int64 `bson:"min_seq"` + CreateTime time.Time `bson:"create_time"` + IsMsgDestruct bool `bson:"is_msg_destruct"` + MsgDestructTime int64 `bson:"msg_destruct_time"` + LatestMsgDestructTime time.Time `bson:"latest_msg_destruct_time"` } func (ConversationModel) TableName() string { @@ -61,11 +83,11 @@ type ConversationModelInterface interface { FindUserIDAllConversations(ctx context.Context, userID string) (conversations []*ConversationModel, err error) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) - FindSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) + //FindSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) GetAllConversationIDs(ctx context.Context) ([]string, error) GetAllConversationIDsNumber(ctx context.Context) (int64, error) - PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) (conversationIDs []string, err error) - GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hashReadSeqs map[string]int64, err error) + PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error) + //GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hashReadSeqs map[string]int64, err error) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*ConversationModel, error) GetConversationIDsNeedDestruct(ctx context.Context) ([]*ConversationModel, error) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)