diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 739fa9688..b1eaaf057 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -207,8 +207,8 @@ func (c *Client) handleMessage(message []byte) error { binaryReq.ReqIdentifier, ) } - c.replyMessage(ctx, &binaryReq, messageErr, resp) - return nil + + return c.replyMessage(ctx, &binaryReq, messageErr, resp) } func (c *Client) setAppBackgroundStatus(ctx context.Context, req Req) ([]byte, error) { @@ -229,7 +229,7 @@ func (c *Client) close() { c.longConnServer.UnRegister(c) } -func (c *Client) replyMessage(ctx context.Context, binaryReq *Req, err error, resp []byte) { +func (c *Client) replyMessage(ctx context.Context, binaryReq *Req, err error, resp []byte) error { errResp := apiresp.ParseError(err) mReply := Resp{ ReqIdentifier: binaryReq.ReqIdentifier, @@ -244,6 +244,10 @@ func (c *Client) replyMessage(ctx context.Context, binaryReq *Req, err error, re if err != nil { log.ZWarn(ctx, "wireBinaryMsg replyMessage", err, "resp", mReply.String()) } + if binaryReq.ReqIdentifier == WsLogoutMsg { + return errors.New("user logout") + } + return nil } func (c *Client) PushMessage(ctx context.Context, msgData *sdkws.MsgData) error { diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index 43dd3cb8c..c31cd02dd 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -166,7 +166,7 @@ func (m *msgServer) MarkConversationAsRead( return nil, err } - } else if conversation.ConversationType == constant.GroupChatType { + } else if conversation.ConversationType == constant.SuperGroupChatType { if req.HasReadSeq > hasReadSeq { err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq) if err != nil { diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 2c52e3ed2..f2ceb3beb 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -305,7 +305,7 @@ func (s *userServer) SetUserStatus(ctx context.Context, req *pbuser.SetUserStatu if err != nil { return nil, err } - list, err := s.UserDatabase.GetAllSubscribeList(ctx, req.UserID) + list, err := s.UserDatabase.GetSubscribedList(ctx, req.UserID) if err != nil { return nil, err } diff --git a/pkg/common/db/controller/user.go b/pkg/common/db/controller/user.go index 63aef72dd..9c6fdc5c4 100644 --- a/pkg/common/db/controller/user.go +++ b/pkg/common/db/controller/user.go @@ -59,6 +59,8 @@ type UserDatabase interface { UnsubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error // GetAllSubscribeList Get a list of all subscriptions GetAllSubscribeList(ctx context.Context, userID string) ([]string, error) + // GetSubscribedList Get all subscribed lists + GetSubscribedList(ctx context.Context, userID string) ([]string, error) // GetUserStatus Get the online status of the user GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) // SetUserStatus Set the user status and store the user status in redis @@ -199,6 +201,15 @@ func (u *userDatabase) GetAllSubscribeList(ctx context.Context, userID string) ( return list, nil } +// GetSubscribedList Get all subscribed lists. +func (u *userDatabase) GetSubscribedList(ctx context.Context, userID string) ([]string, error) { + list, err := u.mongoDB.GetSubscribedList(ctx, userID) + if err != nil { + return nil, err + } + return list, nil +} + // GetUserStatus get user status. func (u *userDatabase) GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) { onlineStatusList, err := u.cache.GetUserStatus(ctx, userIDs) diff --git a/pkg/common/db/table/unrelation/user.go b/pkg/common/db/table/unrelation/user.go index c1ff15a49..1505829e5 100644 --- a/pkg/common/db/table/unrelation/user.go +++ b/pkg/common/db/table/unrelation/user.go @@ -41,4 +41,6 @@ type UserModelInterface interface { RemoveSubscribedListFromUser(ctx context.Context, userID string, userIDList []string) error // GetAllSubscribeList Get all users subscribed by this user GetAllSubscribeList(ctx context.Context, id string) (userIDList []string, err error) + // GetSubscribedList Get the user subscribed by those users + GetSubscribedList(ctx context.Context, id string) (userIDList []string, err error) } diff --git a/pkg/common/db/unrelation/user.go b/pkg/common/db/unrelation/user.go index cbc22c4d7..4b4a78c79 100644 --- a/pkg/common/db/unrelation/user.go +++ b/pkg/common/db/unrelation/user.go @@ -171,3 +171,20 @@ func (u *UserMongoDriver) GetAllSubscribeList(ctx context.Context, userID string } return user.UserIDList, nil } + +// GetSubscribedList Get the user subscribed by those users. +func (u *UserMongoDriver) GetSubscribedList(ctx context.Context, userID string) (userIDList []string, err error) { + var user unrelation.UserModel + cursor := u.userCollection.FindOne( + ctx, + bson.M{"user_id": SubscribedPrefix + userID}) + err = cursor.Decode(&user) + if err != nil { + if err == mongo.ErrNoDocuments { + return []string{}, nil + } else { + return nil, errs.Wrap(err) + } + } + return user.UserIDList, nil +}