From 5147467618bc5c59faee08a708bb97d5bc9ff4ed Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Tue, 10 Oct 2023 18:37:43 +0800 Subject: [PATCH] fix: user status change. --- internal/rpc/user/user.go | 6 ++- pkg/common/db/cache/user.go | 89 ++++++++++++++++++-------------- pkg/common/db/unrelation/user.go | 6 ++- 3 files changed, 60 insertions(+), 41 deletions(-) diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index d22715f0c..a830806e8 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -289,7 +289,8 @@ func (s *userServer) SubscribeOrCancelUsersStatus(ctx context.Context, req *pbus } // GetUserStatus Get the online status of the user. -func (s *userServer) GetUserStatus(ctx context.Context, req *pbuser.GetUserStatusReq) (resp *pbuser.GetUserStatusResp, err error) { +func (s *userServer) GetUserStatus(ctx context.Context, req *pbuser.GetUserStatusReq) (resp *pbuser.GetUserStatusResp, + err error) { onlineStatusList, err := s.UserDatabase.GetUserStatus(ctx, req.UserIDs) if err != nil { return nil, err @@ -298,7 +299,8 @@ func (s *userServer) GetUserStatus(ctx context.Context, req *pbuser.GetUserStatu } // SetUserStatus Synchronize user's online status. -func (s *userServer) SetUserStatus(ctx context.Context, req *pbuser.SetUserStatusReq) (resp *pbuser.SetUserStatusResp, err error) { +func (s *userServer) SetUserStatus(ctx context.Context, req *pbuser.SetUserStatusReq) (resp *pbuser.SetUserStatusResp, + err error) { err = s.UserDatabase.SetUserStatus(ctx, req.StatusList) if err != nil { return nil, err diff --git a/pkg/common/db/cache/user.go b/pkg/common/db/cache/user.go index 8c270c6e4..ceed3c370 100644 --- a/pkg/common/db/cache/user.go +++ b/pkg/common/db/cache/user.go @@ -17,6 +17,7 @@ package cache import ( "context" "encoding/json" + "github.com/OpenIMSDK/tools/log" "hash/crc32" "strconv" "time" @@ -205,12 +206,12 @@ func (u *UserCacheRedis) GetUserStatus(ctx context.Context, userIDs []string) ([ // SetUserStatus Set the user status and save it in redis. func (u *UserCacheRedis) SetUserStatus(ctx context.Context, list []*user.OnlineStatus) error { - for _, status := range list { + for _, userStatus := range list { var isNewKey int64 - UserIDNum := crc32.ChecksumIEEE([]byte(status.UserID)) + UserIDNum := crc32.ChecksumIEEE([]byte(userStatus.UserID)) modKey := strconv.Itoa(int(UserIDNum % statusMod)) key := olineStatusKey + modKey - jsonData, err := json.Marshal(status) + jsonData, err := json.Marshal(userStatus) if err != nil { return errs.Wrap(err) } @@ -219,37 +220,44 @@ func (u *UserCacheRedis) SetUserStatus(ctx context.Context, list []*user.OnlineS return errs.Wrap(err) } if isNewKey == 0 { - _, err = u.rdb.HSet(ctx, key, status.UserID, string(jsonData)).Result() - if err != nil { - return errs.Wrap(err) + if userStatus.Status == constant.Online { + _, err = u.rdb.HSet(ctx, key, userStatus.UserID, string(jsonData)).Result() + if err != nil { + return errs.Wrap(err) + } + u.rdb.Expire(ctx, key, userOlineStatusExpireTime) } - u.rdb.Expire(ctx, key, userOlineStatusExpireTime) + } else { - result, err := u.rdb.HGet(ctx, key, status.UserID).Result() + result, err := u.rdb.HGet(ctx, key, userStatus.UserID).Result() if err != nil { - return errs.Wrap(err) + //redis do not have this user key + if err == redis.Nil { + if userStatus.Status == constant.Offline { + log.ZWarn(ctx, "this user not online,maybe trigger order not right", + err, "userStatus", userStatus) + continue + } + } else { + return errs.Wrap(err) + } } var onlineStatus user.OnlineStatus err = json.Unmarshal([]byte(result), &onlineStatus) if err != nil { return errs.Wrap(err) } - onlineStatus.UserID = status.UserID - if status.Status == constant.Offline { + + onlineStatus.UserID = userStatus.UserID + if userStatus.Status == constant.Offline { var newPlatformIDs []int32 for _, val := range onlineStatus.PlatformIDs { - if val != status.PlatformIDs[0] { + if val != userStatus.PlatformIDs[0] { newPlatformIDs = append(newPlatformIDs, val) } } if newPlatformIDs == nil { - onlineStatus.Status = constant.Offline - onlineStatus.PlatformIDs = []int32{} - newjsonData, err := json.Marshal(&onlineStatus) - if err != nil { - return errs.Wrap(err) - } - _, err = u.rdb.HSet(ctx, key, status.UserID, string(newjsonData)).Result() + _, err = u.rdb.HDel(ctx, key, userStatus.UserID).Result() if err != nil { return errs.Wrap(err) } @@ -259,31 +267,36 @@ func (u *UserCacheRedis) SetUserStatus(ctx context.Context, list []*user.OnlineS if err != nil { return errs.Wrap(err) } - _, err = u.rdb.HSet(ctx, key, status.UserID, string(newjsonData)).Result() + _, err = u.rdb.HSet(ctx, key, userStatus.UserID, string(newjsonData)).Result() if err != nil { return errs.Wrap(err) } } } else { - onlineStatus.Status = constant.Online - // Judging whether to be kicked out. - flag := false - for _, val := range onlineStatus.PlatformIDs { - if val == status.PlatformIDs[0] { - flag = true - break + //user all terminal offline directly set online + if onlineStatus.UserID == "" { + onlineStatus.Status = constant.Online + onlineStatus.UserID = userStatus.UserID + onlineStatus.PlatformIDs = userStatus.PlatformIDs + newjsonData, err := json.Marshal(&onlineStatus) + if err != nil { + return errs.Wrap(err) + } + _, err = u.rdb.HSet(ctx, key, userStatus.UserID, string(newjsonData)).Result() + if err != nil { + return errs.Wrap(err) + } + } else { + onlineStatus.Status = constant.Online + onlineStatus.PlatformIDs = append(onlineStatus.PlatformIDs, userStatus.PlatformIDs[0]) + newjsonData, err := json.Marshal(&onlineStatus) + if err != nil { + return errs.Wrap(err) + } + _, err = u.rdb.HSet(ctx, key, userStatus.UserID, string(newjsonData)).Result() + if err != nil { + return errs.Wrap(err) } - } - if !flag { - onlineStatus.PlatformIDs = append(onlineStatus.PlatformIDs, status.PlatformIDs[0]) - } - newjsonData, err := json.Marshal(&onlineStatus) - if err != nil { - return errs.Wrap(err) - } - _, err = u.rdb.HSet(ctx, key, status.UserID, string(newjsonData)).Result() - if err != nil { - return errs.Wrap(err) } } } diff --git a/pkg/common/db/unrelation/user.go b/pkg/common/db/unrelation/user.go index 777f27386..eeedd2771 100644 --- a/pkg/common/db/unrelation/user.go +++ b/pkg/common/db/unrelation/user.go @@ -176,7 +176,11 @@ func (u *UserMongoDriver) GetSubscribedList(ctx context.Context, userID string) bson.M{"user_id": SubscribedPrefix + userID}) err = cursor.Decode(&user) if err != nil { - return nil, errs.Wrap(err) + if err == mongo.ErrNoDocuments { + return []string{}, nil + } else { + return nil, errs.Wrap(err) + } } return user.UserIDList, nil }