diff --git a/go.mod b/go.mod index 10c332070..70b4a5eeb 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require github.com/google/uuid v1.3.1 require ( github.com/IBM/sarama v1.41.2 - github.com/OpenIMSDK/protocol v0.0.24 + github.com/OpenIMSDK/protocol v0.0.25 github.com/OpenIMSDK/tools v0.0.14 github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible github.com/go-redis/redis v6.15.9+incompatible diff --git a/go.sum b/go.sum index c9792255b..8e18bebbd 100644 --- a/go.sum +++ b/go.sum @@ -18,8 +18,8 @@ firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIw github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/IBM/sarama v1.41.2 h1:ZDBZfGPHAD4uuAtSv4U22fRZBgst0eEwGFzLj0fb85c= github.com/IBM/sarama v1.41.2/go.mod h1:xdpu7sd6OE1uxNdjYTSKUfY8FaKkJES9/+EyjSgiGQk= -github.com/OpenIMSDK/protocol v0.0.24 h1:wk/S0GOGVh8mBbpmjKxSsyYMhyBazdn/ZcS9VqXfT24= -github.com/OpenIMSDK/protocol v0.0.24/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= +github.com/OpenIMSDK/protocol v0.0.25 h1:AtB0Ia5LO26oqPoPJDIS4UMH3Wb2li96fMgfzI2cr4I= +github.com/OpenIMSDK/protocol v0.0.25/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/OpenIMSDK/tools v0.0.14 h1:WLof/+WxyPyRST+QkoTKubYCiV73uCLiL8pgnpH/yKQ= github.com/OpenIMSDK/tools v0.0.14/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index a830806e8..2c52e3ed2 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -301,30 +301,30 @@ func (s *userServer) GetUserStatus(ctx context.Context, req *pbuser.GetUserStatu // SetUserStatus Synchronize user's online status. func (s *userServer) SetUserStatus(ctx context.Context, req *pbuser.SetUserStatusReq) (resp *pbuser.SetUserStatusResp, err error) { - err = s.UserDatabase.SetUserStatus(ctx, req.StatusList) + err = s.UserDatabase.SetUserStatus(ctx, req.UserID, req.Status, req.PlatformID) if err != nil { return nil, err } - for _, value := range req.StatusList { - list, err := s.UserDatabase.GetSubscribedList(ctx, value.UserID) - if err != nil { - return nil, err - } - for _, userID := range list { - tips := &sdkws.UserStatusChangeTips{ - FromUserID: value.UserID, - ToUserID: userID, - Status: value.Status, - PlatformID: value.PlatformIDs[0], - } - s.userNotificationSender.UserStatusChangeNotification(ctx, tips) + list, err := s.UserDatabase.GetAllSubscribeList(ctx, req.UserID) + if err != nil { + return nil, err + } + for _, userID := range list { + tips := &sdkws.UserStatusChangeTips{ + FromUserID: req.UserID, + ToUserID: userID, + Status: req.Status, + PlatformID: req.PlatformID, } + s.userNotificationSender.UserStatusChangeNotification(ctx, tips) } + return &pbuser.SetUserStatusResp{}, nil } // GetSubscribeUsersStatus Get the online status of subscribers. -func (s *userServer) GetSubscribeUsersStatus(ctx context.Context, req *pbuser.GetSubscribeUsersStatusReq) (*pbuser.GetSubscribeUsersStatusResp, error) { +func (s *userServer) GetSubscribeUsersStatus(ctx context.Context, + req *pbuser.GetSubscribeUsersStatusReq) (*pbuser.GetSubscribeUsersStatusResp, error) { userList, err := s.UserDatabase.GetAllSubscribeList(ctx, req.UserID) if err != nil { return nil, err diff --git a/pkg/common/db/cache/user.go b/pkg/common/db/cache/user.go index dfa89fec5..192d5a685 100644 --- a/pkg/common/db/cache/user.go +++ b/pkg/common/db/cache/user.go @@ -53,7 +53,7 @@ type UserCache interface { GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) DelUsersGlobalRecvMsgOpt(userIDs ...string) UserCache GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) - SetUserStatus(ctx context.Context, list []*user.OnlineStatus) error + SetUserStatus(ctx context.Context, userID string, status, platformID int32) error } type UserCacheRedis struct { @@ -200,107 +200,98 @@ func (u *UserCacheRedis) GetUserStatus(ctx context.Context, userIDs []string) ([ return nil, errs.Wrap(err) } onlineStatus.UserID = userID + onlineStatus.Status = constant.Online res = append(res, &onlineStatus) } return res, nil } // SetUserStatus Set the user status and save it in redis. -func (u *UserCacheRedis) SetUserStatus(ctx context.Context, list []*user.OnlineStatus) error { - for _, userStatus := range list { - var isNewKey int64 - UserIDNum := crc32.ChecksumIEEE([]byte(userStatus.UserID)) - modKey := strconv.Itoa(int(UserIDNum % statusMod)) - key := olineStatusKey + modKey - jsonData, err := json.Marshal(userStatus) - if err != nil { - return errs.Wrap(err) +func (u *UserCacheRedis) SetUserStatus(ctx context.Context, userID string, status, platformID int32) error { + UserIDNum := crc32.ChecksumIEEE([]byte(userID)) + modKey := strconv.Itoa(int(UserIDNum % statusMod)) + key := olineStatusKey + modKey + + isNewKey, err := u.rdb.Exists(ctx, key).Result() + if err != nil { + return errs.Wrap(err) + } + if isNewKey == 0 { + if status == constant.Online { + onlineStatus := user.OnlineStatus{ + UserID: userID, + Status: constant.Online, + PlatformIDs: []int32{platformID}, + } + jsonData, err := json.Marshal(onlineStatus) + if err != nil { + return errs.Wrap(err) + } + _, err = u.rdb.HSet(ctx, key, userID, string(jsonData)).Result() + if err != nil { + return errs.Wrap(err) + } + u.rdb.Expire(ctx, key, userOlineStatusExpireTime) + return nil } - isNewKey, err = u.rdb.Exists(ctx, key).Result() - if err != nil { + } + + isNil := false + result, err := u.rdb.HGet(ctx, key, userID).Result() + if err != nil { + if err == redis.Nil { + isNil = true + } else { return errs.Wrap(err) } - if isNewKey == 0 { - if userStatus.Status == constant.Online { - _, err = u.rdb.HSet(ctx, key, userStatus.UserID, string(jsonData)).Result() - if err != nil { - return errs.Wrap(err) - } - u.rdb.Expire(ctx, key, userOlineStatusExpireTime) + } + var onlineStatus user.OnlineStatus + err = json.Unmarshal([]byte(result), &onlineStatus) + if err != nil { + return errs.Wrap(err) + } + if status == constant.Offline { + if isNil { + log.ZWarn(ctx, "this user not online,maybe trigger order not right", + err, "userStatus", status) + return nil + } + var newPlatformIDs []int32 + for _, val := range onlineStatus.PlatformIDs { + if val != platformID { + newPlatformIDs = append(newPlatformIDs, val) } - - } else { - result, err := u.rdb.HGet(ctx, key, userStatus.UserID).Result() + } + if newPlatformIDs == nil { + _, err = u.rdb.HDel(ctx, key, userID).Result() if err != nil { - //redis do not have this user key - if err == redis.Nil { - if userStatus.Status == constant.Offline { - log.ZWarn(ctx, "this user not online,maybe trigger order not right", - err, "userStatus", userStatus) - continue - } - } else { - return errs.Wrap(err) - } + return errs.Wrap(err) } - var onlineStatus user.OnlineStatus - err = json.Unmarshal([]byte(result), &onlineStatus) + } else { + onlineStatus.PlatformIDs = newPlatformIDs + newjsonData, err := json.Marshal(&onlineStatus) if err != nil { return errs.Wrap(err) } - - onlineStatus.UserID = userStatus.UserID - if userStatus.Status == constant.Offline { - var newPlatformIDs []int32 - for _, val := range onlineStatus.PlatformIDs { - if val != userStatus.PlatformIDs[0] { - newPlatformIDs = append(newPlatformIDs, val) - } - } - if newPlatformIDs == nil { - _, err = u.rdb.HDel(ctx, key, userStatus.UserID).Result() - if err != nil { - return errs.Wrap(err) - } - } else { - onlineStatus.PlatformIDs = newPlatformIDs - newjsonData, err := json.Marshal(&onlineStatus) - if err != nil { - return errs.Wrap(err) - } - _, err = u.rdb.HSet(ctx, key, userStatus.UserID, string(newjsonData)).Result() - if err != nil { - return errs.Wrap(err) - } - } - } else { - //user all terminal offline directly set online - if onlineStatus.UserID == "" { - onlineStatus.Status = constant.Online - onlineStatus.UserID = userStatus.UserID - onlineStatus.PlatformIDs = userStatus.PlatformIDs - newjsonData, err := json.Marshal(&onlineStatus) - if err != nil { - return errs.Wrap(err) - } - _, err = u.rdb.HSet(ctx, key, userStatus.UserID, string(newjsonData)).Result() - if err != nil { - return errs.Wrap(err) - } - } else { - onlineStatus.Status = constant.Online - onlineStatus.PlatformIDs = append(onlineStatus.PlatformIDs, userStatus.PlatformIDs[0]) - newjsonData, err := json.Marshal(&onlineStatus) - if err != nil { - return errs.Wrap(err) - } - _, err = u.rdb.HSet(ctx, key, userStatus.UserID, string(newjsonData)).Result() - if err != nil { - return errs.Wrap(err) - } - } + _, err = u.rdb.HSet(ctx, key, userID, string(newjsonData)).Result() + if err != nil { + return errs.Wrap(err) } } + } else { + onlineStatus.Status = constant.Online + onlineStatus.UserID = userID + onlineStatus.PlatformIDs = append(onlineStatus.PlatformIDs, platformID) + newjsonData, err := json.Marshal(&onlineStatus) + if err != nil { + return errs.Wrap(err) + } + _, err = u.rdb.HSet(ctx, key, userID, string(newjsonData)).Result() + if err != nil { + return errs.Wrap(err) + } + } + return nil } diff --git a/pkg/common/db/controller/user.go b/pkg/common/db/controller/user.go index ab86cfd27..63aef72dd 100644 --- a/pkg/common/db/controller/user.go +++ b/pkg/common/db/controller/user.go @@ -59,12 +59,10 @@ type UserDatabase interface { UnsubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error // GetAllSubscribeList Get a list of all subscriptions GetAllSubscribeList(ctx context.Context, userID string) ([]string, error) - // GetSubscribedList Get all subscribed lists - GetSubscribedList(ctx context.Context, userID string) ([]string, error) // GetUserStatus Get the online status of the user GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) // SetUserStatus Set the user status and store the user status in redis - SetUserStatus(ctx context.Context, list []*user.OnlineStatus) error + SetUserStatus(ctx context.Context, userID string, status, platformID int32) error } type userDatabase struct { @@ -201,15 +199,6 @@ func (u *userDatabase) GetAllSubscribeList(ctx context.Context, userID string) ( return list, nil } -// GetSubscribedList Get all subscribed lists. -func (u *userDatabase) GetSubscribedList(ctx context.Context, userID string) ([]string, error) { - list, err := u.mongoDB.GetSubscribedList(ctx, userID) - if err != nil { - return nil, err - } - return list, nil -} - // GetUserStatus get user status. func (u *userDatabase) GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) { onlineStatusList, err := u.cache.GetUserStatus(ctx, userIDs) @@ -217,6 +206,6 @@ func (u *userDatabase) GetUserStatus(ctx context.Context, userIDs []string) ([]* } // SetUserStatus Set the user status and save it in redis. -func (u *userDatabase) SetUserStatus(ctx context.Context, list []*user.OnlineStatus) error { - return u.cache.SetUserStatus(ctx, list) +func (u *userDatabase) SetUserStatus(ctx context.Context, userID string, status, platformID int32) error { + return u.cache.SetUserStatus(ctx, userID, status, platformID) } diff --git a/pkg/common/db/table/unrelation/user.go b/pkg/common/db/table/unrelation/user.go index 1505829e5..c1ff15a49 100644 --- a/pkg/common/db/table/unrelation/user.go +++ b/pkg/common/db/table/unrelation/user.go @@ -41,6 +41,4 @@ type UserModelInterface interface { RemoveSubscribedListFromUser(ctx context.Context, userID string, userIDList []string) error // GetAllSubscribeList Get all users subscribed by this user GetAllSubscribeList(ctx context.Context, id string) (userIDList []string, err error) - // GetSubscribedList Get the user subscribed by those users - GetSubscribedList(ctx context.Context, id string) (userIDList []string, err error) } diff --git a/pkg/common/db/unrelation/user.go b/pkg/common/db/unrelation/user.go index eeedd2771..cbc22c4d7 100644 --- a/pkg/common/db/unrelation/user.go +++ b/pkg/common/db/unrelation/user.go @@ -162,19 +162,6 @@ func (u *UserMongoDriver) GetAllSubscribeList(ctx context.Context, userID string ctx, bson.M{"user_id": SubscriptionPrefix + userID}) err = cursor.Decode(&user) - if err != nil { - return nil, errs.Wrap(err) - } - return user.UserIDList, nil -} - -// GetSubscribedList Get the user subscribed by those users. -func (u *UserMongoDriver) GetSubscribedList(ctx context.Context, userID string) (userIDList []string, err error) { - var user unrelation.UserModel - cursor := u.userCollection.FindOne( - ctx, - bson.M{"user_id": SubscribedPrefix + userID}) - err = cursor.Decode(&user) if err != nil { if err == mongo.ErrNoDocuments { return []string{}, nil diff --git a/pkg/rpcclient/user.go b/pkg/rpcclient/user.go index 6929b4fd9..c40d95727 100644 --- a/pkg/rpcclient/user.go +++ b/pkg/rpcclient/user.go @@ -173,6 +173,9 @@ func (u *UserRpcClient) GetAllUserIDs(ctx context.Context, pageNumber, showNumbe // SetUserStatus sets the status for a user based on the provided user ID, status, and platform ID. func (u *UserRpcClient) SetUserStatus(ctx context.Context, userID string, status int32, platformID int) error { - _, err := u.Client.SetUserStatus(ctx, &user.SetUserStatusReq{StatusList: []*user.OnlineStatus{{UserID: userID, Status: status, PlatformIDs: []int32{int32(platformID)}}}}) + _, err := u.Client.SetUserStatus(ctx, &user.SetUserStatusReq{ + UserID: userID, + Status: status, PlatformID: int32(platformID), + }) return err }