diff --git a/internal/rpc/user/online.go b/internal/rpc/user/online.go index e853ceae2..99b272006 100644 --- a/internal/rpc/user/online.go +++ b/internal/rpc/user/online.go @@ -3,7 +3,6 @@ package user import ( "context" "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/protocol/sdkws" pbuser "github.com/openimsdk/protocol/user" ) @@ -38,23 +37,6 @@ func (s *userServer) getUsersOnlineStatus(ctx context.Context, userIDs []string) // SubscribeOrCancelUsersStatus Subscribe online or cancel online users. func (s *userServer) SubscribeOrCancelUsersStatus(ctx context.Context, req *pbuser.SubscribeOrCancelUsersStatusReq) (*pbuser.SubscribeOrCancelUsersStatusResp, error) { - if req.Genre == constant.SubscriberUser { - err := s.db.SubscribeUsersStatus(ctx, req.UserID, req.UserIDs) - if err != nil { - return nil, err - } - var status []*pbuser.OnlineStatus - status, err = s.getUsersOnlineStatus(ctx, req.UserIDs) - if err != nil { - return nil, err - } - return &pbuser.SubscribeOrCancelUsersStatusResp{StatusList: status}, nil - } else if req.Genre == constant.Unsubscribe { - err := s.db.UnsubscribeUsersStatus(ctx, req.UserID, req.UserIDs) - if err != nil { - return nil, err - } - } return &pbuser.SubscribeOrCancelUsersStatusResp{}, nil } @@ -82,34 +64,12 @@ func (s *userServer) SetUserStatus(ctx context.Context, req *pbuser.SetUserStatu if err := s.online.SetUserOnline(ctx, req.UserID, online, offline); err != nil { return nil, err } - list, err := s.db.GetSubscribedList(ctx, req.UserID) - if err != nil { - return nil, err - } - for _, userID := range list { - tips := &sdkws.UserStatusChangeTips{ - FromUserID: req.UserID, - ToUserID: userID, - Status: req.Status, - PlatformID: req.PlatformID, - } - s.userNotificationSender.UserStatusChangeNotification(ctx, tips) - } - return &pbuser.SetUserStatusResp{}, nil } // GetSubscribeUsersStatus Get the online status of subscribers. func (s *userServer) GetSubscribeUsersStatus(ctx context.Context, req *pbuser.GetSubscribeUsersStatusReq) (*pbuser.GetSubscribeUsersStatusResp, error) { - userList, err := s.db.GetAllSubscribeList(ctx, req.UserID) - if err != nil { - return nil, err - } - onlineStatusList, err := s.getUsersOnlineStatus(ctx, userList) - if err != nil { - return nil, err - } - return &pbuser.GetSubscribeUsersStatusResp{StatusList: onlineStatusList}, nil + return &pbuser.GetSubscribeUsersStatusResp{}, nil } func (s *userServer) SetUserOnlineStatus(ctx context.Context, req *pbuser.SetUserOnlineStatusReq) (*pbuser.SetUserOnlineStatusResp, error) { diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 0b96077ec..779d9b0c4 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -93,8 +93,7 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi return err } userCache := redis.NewUserCacheRedis(rdb, &config.LocalCacheConfig, userDB, redis.GetRocksCacheOptions()) - userMongoDB := mgo.NewUserMongoDriver(mgocli.GetDB()) - database := controller.NewUserDatabase(userDB, userCache, mgocli.GetTx(), userMongoDB) + database := controller.NewUserDatabase(userDB, userCache, mgocli.GetTx()) friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend) groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) diff --git a/pkg/common/storage/controller/user.go b/pkg/common/storage/controller/user.go index 59559537b..5ce8104e7 100644 --- a/pkg/common/storage/controller/user.go +++ b/pkg/common/storage/controller/user.go @@ -62,14 +62,6 @@ type UserDatabase interface { CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error) SortQuery(ctx context.Context, userIDName map[string]string, asc bool) ([]*model.User, error) - // SubscribeUsersStatus Subscribe a user's presence status - SubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error - // UnsubscribeUsersStatus unsubscribe a user's presence status - UnsubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error - // GetAllSubscribeList Get a list of all subscriptions - GetAllSubscribeList(ctx context.Context, userID string) ([]string, error) - // GetSubscribedList Get all subscribed lists - GetSubscribedList(ctx context.Context, userID string) ([]string, error) // CRUD user command AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error @@ -80,14 +72,13 @@ type UserDatabase interface { } type userDatabase struct { - tx tx.Tx - userDB database.User - cache cache.UserCache - mongoDB database.SubscribeUser + tx tx.Tx + userDB database.User + cache cache.UserCache } -func NewUserDatabase(userDB database.User, cache cache.UserCache, tx tx.Tx, mongoDB database.SubscribeUser) UserDatabase { - return &userDatabase{userDB: userDB, cache: cache, tx: tx, mongoDB: mongoDB} +func NewUserDatabase(userDB database.User, cache cache.UserCache, tx tx.Tx) UserDatabase { + return &userDatabase{userDB: userDB, cache: cache, tx: tx} } func (u *userDatabase) InitOnce(ctx context.Context, users []*model.User) error { @@ -212,36 +203,6 @@ func (u *userDatabase) SortQuery(ctx context.Context, userIDName map[string]stri return u.userDB.SortQuery(ctx, userIDName, asc) } -// SubscribeUsersStatus Subscribe or unsubscribe a user's presence status. -func (u *userDatabase) SubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error { - err := u.mongoDB.AddSubscriptionList(ctx, userID, userIDs) - return err -} - -// UnsubscribeUsersStatus unsubscribe a user's presence status. -func (u *userDatabase) UnsubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error { - err := u.mongoDB.UnsubscriptionList(ctx, userID, userIDs) - return err -} - -// GetAllSubscribeList Get a list of all subscriptions. -func (u *userDatabase) GetAllSubscribeList(ctx context.Context, userID string) ([]string, error) { - list, err := u.mongoDB.GetAllSubscribeList(ctx, userID) - if err != nil { - return nil, err - } - return list, nil -} - -// GetSubscribedList Get all subscribed lists. -func (u *userDatabase) GetSubscribedList(ctx context.Context, userID string) ([]string, error) { - list, err := u.mongoDB.GetSubscribedList(ctx, userID) - if err != nil { - return nil, err - } - return list, nil -} - func (u *userDatabase) AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error { return u.userDB.AddUserCommand(ctx, userID, Type, UUID, value, ex) } diff --git a/pkg/common/storage/database/mgo/subscribe.go b/pkg/common/storage/database/mgo/subscribe.go deleted file mode 100644 index 5b7d9786b..000000000 --- a/pkg/common/storage/database/mgo/subscribe.go +++ /dev/null @@ -1,189 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package mgo - -import ( - "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - - "github.com/openimsdk/tools/errs" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" -) - -// prefixes and suffixes. -const ( - SubscriptionPrefix = "subscription_prefix" - SubscribedPrefix = "subscribed_prefix" -) - -// MaximumSubscription Maximum number of subscriptions. -const ( - MaximumSubscription = 3000 -) - -func NewUserMongoDriver(database *mongo.Database) database.SubscribeUser { - return &UserMongoDriver{ - userCollection: database.Collection(model.SubscribeUserTableName), - } -} - -type UserMongoDriver struct { - userCollection *mongo.Collection -} - -// AddSubscriptionList Subscriber's handling of thresholds. -func (u *UserMongoDriver) AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error { - // Check the number of lists in the key. - pipeline := mongo.Pipeline{ - {{"$match", bson.D{{"user_id", SubscriptionPrefix + userID}}}}, - {{"$project", bson.D{{"count", bson.D{{"$size", "$user_id_list"}}}}}}, - } - // perform aggregate operations - cursor, err := u.userCollection.Aggregate(ctx, pipeline) - if err != nil { - return errs.Wrap(err) - } - defer cursor.Close(ctx) - var cnt struct { - Count int `bson:"count"` - } - // iterate over aggregated results - for cursor.Next(ctx) { - err = cursor.Decode(&cnt) - if err != nil { - return errs.Wrap(err) - } - } - var newUserIDList []string - // If the threshold is exceeded, pop out the previous MaximumSubscription - len(userIDList) and insert it. - if cnt.Count+len(userIDList) > MaximumSubscription { - newUserIDList, err = u.GetAllSubscribeList(ctx, userID) - if err != nil { - return err - } - newUserIDList = newUserIDList[MaximumSubscription-len(userIDList):] - _, err = u.userCollection.UpdateOne( - ctx, - bson.M{"user_id": SubscriptionPrefix + userID}, - bson.M{"$set": bson.M{"user_id_list": newUserIDList}}, - ) - if err != nil { - return err - } - // Another way to subscribe to N before pop,Delete after testing - /*for i := 1; i <= MaximumSubscription-len(userIDList); i++ { - _, err := u.userCollection.UpdateOne( - ctx, - bson.M{"user_id": SubscriptionPrefix + userID}, - bson.M{SubscriptionPrefix + userID: bson.M{"$pop": -1}}, - ) - if err != nil { - return err - } - }*/ - } - upsert := true - opts := &options.UpdateOptions{ - Upsert: &upsert, - } - _, err = u.userCollection.UpdateOne( - ctx, - bson.M{"user_id": SubscriptionPrefix + userID}, - bson.M{"$addToSet": bson.M{"user_id_list": bson.M{"$each": userIDList}}}, - opts, - ) - if err != nil { - return errs.Wrap(err) - } - for _, user := range userIDList { - _, err = u.userCollection.UpdateOne( - ctx, - bson.M{"user_id": SubscribedPrefix + user}, - bson.M{"$addToSet": bson.M{"user_id_list": userID}}, - opts, - ) - if err != nil { - return errs.WrapMsg(err, "transaction failed") - } - } - return nil -} - -// UnsubscriptionList Handling of unsubscribe. -func (u *UserMongoDriver) UnsubscriptionList(ctx context.Context, userID string, userIDList []string) error { - _, err := u.userCollection.UpdateOne( - ctx, - bson.M{"user_id": SubscriptionPrefix + userID}, - bson.M{"$pull": bson.M{"user_id_list": bson.M{"$in": userIDList}}}, - ) - if err != nil { - return errs.Wrap(err) - } - err = u.RemoveSubscribedListFromUser(ctx, userID, userIDList) - if err != nil { - return errs.Wrap(err) - } - return nil -} - -// RemoveSubscribedListFromUser Among the unsubscribed users, delete the user from the subscribed list. -func (u *UserMongoDriver) RemoveSubscribedListFromUser(ctx context.Context, userID string, userIDList []string) error { - var err error - for _, userIDTemp := range userIDList { - _, err = u.userCollection.UpdateOne( - ctx, - bson.M{"user_id": SubscribedPrefix + userIDTemp}, - bson.M{"$pull": bson.M{"user_id_list": userID}}, - ) - } - return errs.Wrap(err) -} - -// GetAllSubscribeList Get all users subscribed by this user. -func (u *UserMongoDriver) GetAllSubscribeList(ctx context.Context, userID string) (userIDList []string, err error) { - var user model.SubscribeUser - cursor := u.userCollection.FindOne( - ctx, - bson.M{"user_id": SubscriptionPrefix + userID}) - err = cursor.Decode(&user) - if err != nil { - if err == mongo.ErrNoDocuments { - return []string{}, nil - } else { - return nil, errs.Wrap(err) - } - } - return user.UserIDList, nil -} - -// GetSubscribedList Get the user subscribed by those users. -func (u *UserMongoDriver) GetSubscribedList(ctx context.Context, userID string) (userIDList []string, err error) { - var user model.SubscribeUser - cursor := u.userCollection.FindOne( - ctx, - bson.M{"user_id": SubscribedPrefix + userID}) - err = cursor.Decode(&user) - if err != nil { - if err == mongo.ErrNoDocuments { - return []string{}, nil - } else { - return nil, errs.Wrap(err) - } - } - return user.UserIDList, nil -} diff --git a/pkg/common/storage/database/subscribe.go b/pkg/common/storage/database/subscribe.go deleted file mode 100644 index 5905ecd07..000000000 --- a/pkg/common/storage/database/subscribe.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package database - -import "context" - -// SubscribeUser Operation interface of user mongodb. -type SubscribeUser interface { - // AddSubscriptionList Subscriber's handling of thresholds. - AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error - // UnsubscriptionList Handling of unsubscribe. - UnsubscriptionList(ctx context.Context, userID string, userIDList []string) error - // RemoveSubscribedListFromUser Among the unsubscribed users, delete the user from the subscribed list. - RemoveSubscribedListFromUser(ctx context.Context, userID string, userIDList []string) error - // GetAllSubscribeList Get all users subscribed by this user - GetAllSubscribeList(ctx context.Context, id string) (userIDList []string, err error) - // GetSubscribedList Get the user subscribed by those users - GetSubscribedList(ctx context.Context, id string) (userIDList []string, err error) -}