diff --git a/go.mod b/go.mod index e4bc8edc6..c18eabd08 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( require github.com/google/uuid v1.3.0 require ( - github.com/OpenIMSDK/protocol v0.0.1 + github.com/OpenIMSDK/protocol v0.0.2 github.com/OpenIMSDK/tools v0.0.5 github.com/aliyun/aliyun-oss-go-sdk v2.2.7+incompatible github.com/go-redis/redis v6.15.9+incompatible diff --git a/go.sum b/go.sum index 0089e1924..a93a1687a 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ cloud.google.com/go/storage v1.28.1/go.mod h1:Qnisd4CqDdo6BGs2AD5LLnEsmSQ80wQ5og firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4= firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/OpenIMSDK/protocol v0.0.1 h1:Q6J1jCU00dfqmguxw2XI+IGcVfBAkb5Tz8LgvyeNkk0= -github.com/OpenIMSDK/protocol v0.0.1/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= +github.com/OpenIMSDK/protocol v0.0.2 h1:O53/WiqLCHF9aWPLI32GPF82hn7suM8PkhrtL89Klrw= +github.com/OpenIMSDK/protocol v0.0.2/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/OpenIMSDK/tools v0.0.5 h1:yBVHJ3EpIDcp8VFKPjuGr6MQvFa3t4JByZ+vmeC06/Q= github.com/OpenIMSDK/tools v0.0.5/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= diff --git a/internal/api/route.go b/internal/api/route.go index 4a4f92cc0..0f46f0f0c 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -79,6 +79,9 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive userRouterGroup.POST("/get_users", ParseToken, u.GetUsers) userRouterGroup.POST("/get_users_online_status", ParseToken, u.GetUsersOnlineStatus) userRouterGroup.POST("/get_users_online_token_detail", ParseToken, u.GetUsersOnlineTokenDetail) + userRouterGroup.POST("/subscribe_users_status", ParseToken, u.UnSubscriberStatus) + userRouterGroup.POST("/unsubscribe_users_status", ParseToken, u.UnSubscriberStatus) + } // friend routing group friendRouterGroup := r.Group("/friend", ParseToken) diff --git a/internal/api/user.go b/internal/api/user.go index 8595b3501..108ccac69 100644 --- a/internal/api/user.go +++ b/internal/api/user.go @@ -62,6 +62,7 @@ func (u *UserApi) GetUsers(c *gin.Context) { a2r.Call(user.UserClient.GetPaginationUsers, u.Client, c) } +// GetUsersOnlineStatus Get user online status. func (u *UserApi) GetUsersOnlineStatus(c *gin.Context) { var req msggateway.GetUsersOnlineStatusReq if err := c.BindJSON(&req); err != nil { @@ -95,13 +96,13 @@ func (u *UserApi) GetUsersOnlineStatus(c *gin.Context) { wsResult = append(wsResult, reply.SuccessResult...) } } - // 遍历 api 请求体中的 userIDs + // Traversing the userIDs in the api request body for _, v1 := range req.UserIDs { flag = false res := new(msggateway.GetUsersOnlineStatusResp_SuccessResult) - // 遍历从各个网关中获取的在线结果 + // Iterate through the online results fetched from various gateways for _, v2 := range wsResult { - // 如果匹配上说明在线,反之 + // If matches the above description on the line, and vice versa if v2.UserID == v1 { flag = true res.UserID = v1 @@ -123,6 +124,7 @@ func (u *UserApi) UserRegisterCount(c *gin.Context) { a2r.Call(user.UserClient.UserRegisterCount, u.Client, c) } +// GetUsersOnlineTokenDetail Get user online token details. func (u *UserApi) GetUsersOnlineTokenDetail(c *gin.Context) { var wsResult []*msggateway.GetUsersOnlineStatusResp_SuccessResult var respResult []*msggateway.SingleDetail @@ -182,3 +184,13 @@ func (u *UserApi) GetUsersOnlineTokenDetail(c *gin.Context) { apiresp.GinSuccess(c, respResult) } + +// SubscriberStatus Presence status of subscribed users. +func (u *UserApi) SubscriberStatus(c *gin.Context) { + a2r.Call(user.UserClient.SubscribeOrCancelUsersStatus, u.Client, c) +} + +// UnSubscriberStatus Unsubscribe a user's presence. +func (u *UserApi) UnSubscriberStatus(c *gin.Context) { + a2r.Call(user.UserClient.SubscribeOrCancelUsersStatus, u.Client, c) +} diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index fb6518a7e..576492566 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -17,12 +17,11 @@ package user import ( "context" "errors" - "strings" - "time" - "github.com/OpenIMSDK/Open-IM-Server/pkg/authverify" - + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation" "github.com/OpenIMSDK/tools/log" + "strings" + "time" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/convert" @@ -60,6 +59,10 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error { if err != nil { return err } + mongo, err := unrelation.NewMongo() + if err != nil { + return err + } if err := db.AutoMigrate(&tablerelation.UserModel{}); err != nil { return err } @@ -72,7 +75,8 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error { } userDB := relation.NewUserGorm(db) cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt()) - database := controller.NewUserDatabase(userDB, cache, tx.NewGorm(db)) + userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase()) + database := controller.NewUserDatabase(userDB, cache, tx.NewGorm(db), userMongoDB) friendRpcClient := rpcclient.NewFriendRpcClient(client) msgRpcClient := rpcclient.NewMessageRpcClient(client) u := &userServer{ @@ -235,6 +239,7 @@ func (s *userServer) GetGlobalRecvMessageOpt(ctx context.Context, req *pbuser.Ge return &pbuser.GetGlobalRecvMessageOptResp{GlobalRecvMsgOpt: user[0].GlobalRecvMsgOpt}, nil } +// GetAllUserID Get user account by page. func (s *userServer) GetAllUserID(ctx context.Context, req *pbuser.GetAllUserIDReq) (resp *pbuser.GetAllUserIDResp, err error) { userIDs, err := s.UserDatabase.GetAllUserID(ctx, req.Pagination.PageNumber, req.Pagination.ShowNumber) if err != nil { @@ -243,6 +248,24 @@ func (s *userServer) GetAllUserID(ctx context.Context, req *pbuser.GetAllUserIDR return &pbuser.GetAllUserIDResp{UserIDs: userIDs}, nil } +// SubscribeOrCancelUsersStatus Subscribe online or cancel online users. func (s *userServer) SubscribeOrCancelUsersStatus(ctx context.Context, req *pbuser.SubscribeOrCancelUsersStatusReq) (resp *pbuser.SubscribeOrCancelUsersStatusResp, err error) { + err = s.UserDatabase.SubscribeOrCancelUsersStatus(ctx, req.UserID, req.UserIDs, req.Genre) + if err != nil { + return nil, err + } + //var status map[string][]string + //TODO 获取用户在线列表,返回订阅的用户的在线列表 + + return &pbuser.SubscribeOrCancelUsersStatusResp{}, nil +} + +func (s *userServer) GetUserStatus(ctx context.Context, req *pbuser.GetUserStatusReq) (resp *pbuser.GetUserStatusResp, err error) { + //TODO implement me + panic("implement me") +} + +func (s *userServer) SetUserStatus(ctx context.Context, req *pbuser.SetUserStatusReq) (resp *pbuser.SetUserStatusResp, err error) { + //TODO implement me panic("implement me") } diff --git a/internal/tools/msg.go b/internal/tools/msg.go index 972877516..d13f4597a 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -82,10 +82,12 @@ func InitMsgTool() (*MsgTool, error) { discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) userDB := relation.NewUserGorm(db) msgDatabase := controller.InitCommonMsgDatabase(rdb, mongo.GetDatabase()) + userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase()) userDatabase := controller.NewUserDatabase( userDB, cache.NewUserCacheRedis(rdb, relation.NewUserGorm(db), cache.GetDefaultOpt()), tx.NewGorm(db), + userMongoDB, ) groupDatabase := controller.InitGroupDatabase(db, rdb, mongo.GetDatabase()) conversationDatabase := controller.NewConversationDatabase( diff --git a/pkg/common/db/controller/user.go b/pkg/common/db/controller/user.go index 4f9383b09..83942ce22 100644 --- a/pkg/common/db/controller/user.go +++ b/pkg/common/db/controller/user.go @@ -16,6 +16,8 @@ package controller import ( "context" + unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" + "github.com/OpenIMSDK/protocol/constant" "time" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" @@ -26,38 +28,45 @@ import ( ) type UserDatabase interface { - // 获取指定用户的信息 如有userID未找到 也返回错误 + // FindWithError Get the information of the specified user. If the userID is not found, it will also return an error FindWithError(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) - // 获取指定用户的信息 如有userID未找到 不返回错误 + // Find Get the information of the specified user If the userID is not found, no error will be returned Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) - // 插入多条 外部保证userID 不重复 且在db中不存在 + // Create Insert multiple external guarantees that the userID is not repeated and does not exist in the db Create(ctx context.Context, users []*relation.UserModel) (err error) - // 更新(非零值) 外部保证userID存在 + // Update update (non-zero value) external guarantee userID exists Update(ctx context.Context, user *relation.UserModel) (err error) - // 更新(零值) 外部保证userID存在 + // UpdateByMap update (zero value) external guarantee userID exists UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error) - // 如果没找到,不返回错误 + // Page If not found, no error is returned Page(ctx context.Context, pageNumber, showNumber int32) (users []*relation.UserModel, count int64, err error) - // 只要有一个存在就为true + // IsExist true as long as one exists IsExist(ctx context.Context, userIDs []string) (exist bool, err error) - // 获取所有用户ID + // GetAllUserID Get all user IDs GetAllUserID(ctx context.Context, pageNumber, showNumber int32) ([]string, error) - // 函数内部先查询db中是否存在,存在则什么都不做;不存在则插入 + // InitOnce Inside the function, first query whether it exists in the db, if it exists, do nothing; if it does not exist, insert it InitOnce(ctx context.Context, users []*relation.UserModel) (err error) - // 获取用户总数 + // CountTotal Get the total number of users CountTotal(ctx context.Context, before *time.Time) (int64, error) - // 获取范围内用户增量 + // CountRangeEverydayTotal Get the user increment in the range CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error) + //SubscribeOrCancelUsersStatus Subscribe or unsubscribe a user's presence status + SubscribeOrCancelUsersStatus(ctx context.Context, userID string, userIDs []string, genre int32) error + // GetAllSubscribeList Get a list of all subscriptions + GetAllSubscribeList(ctx context.Context, userID string) ([]string, error) + // GetSubscribedList Get all subscribed lists + GetSubscribedList(ctx context.Context, userID string) ([]string, error) } type userDatabase struct { - userDB relation.UserModelInterface - cache cache.UserCache - tx tx.Tx + userDB relation.UserModelInterface + cache cache.UserCache + tx tx.Tx + mongoDB unRelationTb.UserModelInterface } -func NewUserDatabase(userDB relation.UserModelInterface, cache cache.UserCache, tx tx.Tx) UserDatabase { - return &userDatabase{userDB: userDB, cache: cache, tx: tx} +func NewUserDatabase(userDB relation.UserModelInterface, cache cache.UserCache, tx tx.Tx, mongoDB unRelationTb.UserModelInterface) UserDatabase { + return &userDatabase{userDB: userDB, cache: cache, tx: tx, mongoDB: mongoDB} } func (u *userDatabase) InitOnce(ctx context.Context, users []*relation.UserModel) (err error) { @@ -75,7 +84,7 @@ func (u *userDatabase) InitOnce(ctx context.Context, users []*relation.UserModel return nil } -// 获取指定用户的信息 如有userID未找到 也返回错误. +// FindWithError Get the information of the specified user and return an error if the userID is not found. func (u *userDatabase) FindWithError(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) { users, err = u.cache.GetUsersInfo(ctx, userIDs) if err != nil { @@ -87,13 +96,13 @@ func (u *userDatabase) FindWithError(ctx context.Context, userIDs []string) (use return } -// 获取指定用户的信息 如有userID未找到 不返回错误. +// Find Get the information of the specified user. If the userID is not found, no error will be returned. func (u *userDatabase) Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) { users, err = u.cache.GetUsersInfo(ctx, userIDs) return } -// 插入多条 外部保证userID 不重复 且在db中不存在. +// Create Insert multiple external guarantees that the userID is not repeated and does not exist in the db. func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel) (err error) { if err := u.tx.Transaction(func(tx any) error { err = u.userDB.Create(ctx, users) @@ -111,7 +120,7 @@ func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel) return u.cache.DelUsersInfo(userIDs...).ExecDel(ctx) } -// 更新(非零值) 外部保证userID存在. +// Update (non-zero value) externally guarantees that userID exists. func (u *userDatabase) Update(ctx context.Context, user *relation.UserModel) (err error) { if err := u.userDB.Update(ctx, user); err != nil { return err @@ -119,7 +128,7 @@ func (u *userDatabase) Update(ctx context.Context, user *relation.UserModel) (er return u.cache.DelUsersInfo(user.UserID).ExecDel(ctx) } -// 更新(零值) 外部保证userID存在. +// UpdateByMap update (zero value) externally guarantees that userID exists. func (u *userDatabase) UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error) { if err := u.userDB.UpdateByMap(ctx, userID, args); err != nil { return err @@ -127,7 +136,7 @@ func (u *userDatabase) UpdateByMap(ctx context.Context, userID string, args map[ return u.cache.DelUsersInfo(userID).ExecDel(ctx) } -// 获取,如果没找到,不返回错误. +// Page Gets, returns no error if not found. func (u *userDatabase) Page( ctx context.Context, pageNumber, showNumber int32, @@ -135,7 +144,7 @@ func (u *userDatabase) Page( return u.userDB.Page(ctx, pageNumber, showNumber) } -// userIDs是否存在 只要有一个存在就为true. +// IsExist Does userIDs exist? As long as there is one, it will be true. func (u *userDatabase) IsExist(ctx context.Context, userIDs []string) (exist bool, err error) { users, err := u.userDB.Find(ctx, userIDs) if err != nil { @@ -147,18 +156,42 @@ func (u *userDatabase) IsExist(ctx context.Context, userIDs []string) (exist boo return false, nil } +// GetAllUserID Get all user IDs func (u *userDatabase) GetAllUserID(ctx context.Context, pageNumber, showNumber int32) (userIDs []string, err error) { return u.userDB.GetAllUserID(ctx, pageNumber, showNumber) } +// CountTotal Get the total number of users func (u *userDatabase) CountTotal(ctx context.Context, before *time.Time) (count int64, err error) { return u.userDB.CountTotal(ctx, before) } -func (u *userDatabase) CountRangeEverydayTotal( - ctx context.Context, - start time.Time, - end time.Time, -) (map[string]int64, error) { +// CountRangeEverydayTotal Get the user increment in the range +func (u *userDatabase) CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error) { return u.userDB.CountRangeEverydayTotal(ctx, start, end) } + +//SubscribeOrCancelUsersStatus Subscribe or unsubscribe a user's presence status +func (u *userDatabase) SubscribeOrCancelUsersStatus(ctx context.Context, userID string, userIDs []string, genre int32) error { + var err error + if genre == constant.SubscriberUser { + err = u.mongoDB.AddSubscriptionList(ctx, userID, userIDs) + } else if genre == constant.Unsubscribe { + err = u.mongoDB.UnsubscriptionList(ctx, userID, userIDs) + } + return err +} + +// GetAllSubscribeList Get a list of all subscriptions. +func (u *userDatabase) GetAllSubscribeList(ctx context.Context, userID string) ([]string, error) { + + //TODO 获取所有订阅 + return nil, nil +} + +// GetSubscribedList Get all subscribed lists +func (u *userDatabase) GetSubscribedList(ctx context.Context, userID string) ([]string, error) { + + //TODO 获取所有被订阅 + return nil, nil +} diff --git a/pkg/common/db/table/unrelation/user.go b/pkg/common/db/table/unrelation/user.go new file mode 100644 index 000000000..d264da467 --- /dev/null +++ b/pkg/common/db/table/unrelation/user.go @@ -0,0 +1,42 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package unrelation + +import "context" + +// SubscribeUser collection constant. +const ( + SubscribeUser = "subscribe_user" +) + +// UserModel collection structure. +type UserModel struct { + UserID string `bson:"user_id" json:"userID"` + UserIDList []string `bson:"user_id_list" json:"userIDList"` +} + +func (UserModel) TableName() string { + return SubscribeUser +} + +// UserModelInterface Operation interface of user mongodb. +type UserModelInterface interface { + // AddSubscriptionList Subscriber's handling of thresholds. + AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error + // UnsubscriptionList Handling of unsubscribe. + UnsubscriptionList(ctx context.Context, userID string, userIDList []string) error + // RemoveSubscribedListFromUser Among the unsubscribed users, delete the user from the subscribed list. + RemoveSubscribedListFromUser(ctx context.Context, userID string, userIDList []string) error +} diff --git a/pkg/common/db/unrelation/user.go b/pkg/common/db/unrelation/user.go new file mode 100644 index 000000000..feec8aa21 --- /dev/null +++ b/pkg/common/db/unrelation/user.go @@ -0,0 +1,141 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package unrelation + +import ( + "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" + "github.com/OpenIMSDK/tools/utils" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "log" +) + +// prefixes and suffixes. +const ( + SubscriptionPrefix = "subscription_prefix" + SubscribedPrefix = "subscribed_prefix" +) + +// MaximumSubscription Maximum number of subscriptions. +const ( + MaximumSubscription = 3000 +) + +func NewUserMongoDriver(database *mongo.Database) unrelation.UserModelInterface { + return &UserMongoDriver{ + userCollection: database.Collection(unrelation.SubscribeUser), + } +} + +type UserMongoDriver struct { + userCollection *mongo.Collection +} + +// AddSubscriptionList Subscriber's handling of thresholds. +func (u *UserMongoDriver) AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error { + // Check the number of lists in the key. + filter := bson.M{SubscriptionPrefix + userID: bson.M{"$size": 1}} + result, err := u.userCollection.Find(context.Background(), filter) + if err != nil { + return err + } + var newUserIDList []string + for result.Next(context.Background()) { + err := result.Decode(&newUserIDList) + if err != nil { + log.Fatal(err) + } + } + // If the threshold is exceeded, pop out the previous MaximumSubscription - len(userIDList) and insert it. + if len(newUserIDList)+len(userIDList) > MaximumSubscription { + newUserIDList = newUserIDList[MaximumSubscription-len(userIDList):] + _, err := u.userCollection.UpdateOne( + ctx, + bson.M{"user_id": SubscriptionPrefix + userID}, + bson.M{"$set": bson.M{"user_id_list": newUserIDList}}, + ) + if err != nil { + return err + } + //for i := 1; i <= MaximumSubscription-len(userIDList); i++ { + // _, err := u.userCollection.UpdateOne( + // ctx, + // bson.M{"user_id": SubscriptionPrefix + userID}, + // bson.M{SubscriptionPrefix + userID: bson.M{"$pop": -1}}, + // ) + // if err != nil { + // return err + // } + //} + } + upsert := true + opts := &options.UpdateOptions{ + Upsert: &upsert, + } + _, err = u.userCollection.UpdateOne( + ctx, + bson.M{"user_id": SubscriptionPrefix + userID}, + bson.M{"$addToSet": bson.M{"user_id_list": bson.M{"$each": userIDList}}}, + opts, + ) + if err != nil { + return err + } + for _, user := range userIDList { + _, err = u.userCollection.UpdateOne( + ctx, + bson.M{"user_id": SubscribedPrefix + user}, + bson.M{"$addToSet": bson.M{"user_id_list": userID}}, + opts, + ) + if err != nil { + return utils.Wrap(err, "transaction failed") + } + } + return nil +} + +// UnsubscriptionList Handling of unsubscribe. +func (u *UserMongoDriver) UnsubscriptionList(ctx context.Context, userID string, userIDList []string) error { + _, err := u.userCollection.UpdateOne( + ctx, + bson.M{"user_id": SubscriptionPrefix + userID}, + bson.M{"$pull": bson.M{"user_id_list": bson.M{"$in": userIDList}}}, + ) + if err != nil { + return err + } + err = u.RemoveSubscribedListFromUser(ctx, userID, userIDList) + if err != nil { + return err + } + return nil +} + +// RemoveSubscribedListFromUser Among the unsubscribed users, delete the user from the subscribed list. +func (u *UserMongoDriver) RemoveSubscribedListFromUser(ctx context.Context, userID string, userIDList []string) error { + var newUserIDList []string + for _, value := range userIDList { + newUserIDList = append(newUserIDList, SubscribedPrefix+value) + } + _, err := u.userCollection.UpdateOne( + ctx, + bson.M{"user_id": bson.M{"$in": newUserIDList}}, + bson.M{"$pull": bson.M{"user_id_list": userID}}, + ) + return utils.Wrap(err, "") +}