diff --git a/go.mod b/go.mod index 7b45b0048..03ec5bd97 100644 --- a/go.mod +++ b/go.mod @@ -219,3 +219,5 @@ require ( golang.org/x/crypto v0.27.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) + +replace github.com/openimsdk/protocol => github.com/rookiewwj/protocol v0.0.1 diff --git a/go.sum b/go.sum index 354f80189..78af30017 100644 --- a/go.sum +++ b/go.sum @@ -347,8 +347,6 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FAx+o5El9U/Bw= github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.73-alpha.14 h1:lv9wNiPRm6G7q74TfpMobKrSfeTaBlZ+Ps3O6UFPmaE= -github.com/openimsdk/protocol v0.0.73-alpha.14/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= github.com/openimsdk/tools v0.0.50-alpha.97 h1:6ik5w3PpgDG6VjSo3nb3FT/fxN3JX7iIARVxVu9g7VY= github.com/openimsdk/tools v0.0.50-alpha.97/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= @@ -386,6 +384,8 @@ github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTE github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/rookiewwj/protocol v0.0.1 h1:Bd9F8FfE/viObERdhEuRm8tfJkE82dl244WvK1fNF2M= +github.com/rookiewwj/protocol v0.0.1/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= diff --git a/internal/api/conversation.go b/internal/api/conversation.go index 5a191c0ec..4b2d6356f 100644 --- a/internal/api/conversation.go +++ b/internal/api/conversation.go @@ -73,6 +73,10 @@ func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) { a2r.Call(c, conversation.ConversationClient.GetPinnedConversationIDs, o.Client) } +func (o *ConversationApi) GetConversationReadCursors(c *gin.Context) { + a2r.Call(c, conversation.ConversationClient.GetConversationReadCursors, o.Client) +} + func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) { a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client) } diff --git a/internal/api/router.go b/internal/api/router.go index 1d3a92dd7..268ff56c5 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -277,6 +277,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs) + conversationGroup.POST("/get_conversation_read_cursors", c.GetConversationReadCursors) } { diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index c9bfd3c56..051ccb266 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -835,3 +835,65 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID}) return nil } + +func (c *conversationServer) GetConversationReadCursors(ctx context.Context, req *pbconversation.GetConversationReadCursorsReq) (*pbconversation.GetConversationReadCursorsResp, error) { + if err := req.Check(); err != nil { + return nil, err + } + + conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, req.ConversationIDs) + if err != nil { + return nil, err + } + + conversationMap := make(map[string]*dbModel.Conversation) + for _, conv := range conversations { + conversationMap[conv.ConversationID] = conv + } + + result := make(map[string]*pbconversation.ConversationReadCursorList) + for _, conversationID := range req.ConversationIDs { + conversation, exists := conversationMap[conversationID] + if !exists { + continue + } + + var userIDs []string + if conversation.ConversationType == constant.ReadGroupChatType { + if conversation.GroupID == "" { + log.ZWarn(ctx, "groupID is empty for group conversation", nil, "conversationID", conversationID) + result[conversationID] = &pbconversation.ConversationReadCursorList{Cursors: []*pbconversation.ConversationReadCursor{}} + continue + } + userIDs, err = c.groupClient.GetGroupMemberUserIDs(ctx, conversation.GroupID) + if err != nil { + log.ZWarn(ctx, "GetGroupMemberUserIDs failed", err, "conversationID", conversationID, "groupID", conversation.GroupID) + result[conversationID] = &pbconversation.ConversationReadCursorList{Cursors: []*pbconversation.ConversationReadCursor{}} + continue + } + } else { + continue + } + + var cursors []*pbconversation.ConversationReadCursor + for _, userID := range userIDs { + hasReadSeqs, err := c.msgClient.GetHasReadSeqs(ctx, []string{conversationID}, userID) + if err != nil { + log.ZWarn(ctx, "GetHasReadSeqs failed", err, "userID", userID, "conversationID", conversationID) + continue + } + + readSeq := hasReadSeqs[conversationID] + cursors = append(cursors, &pbconversation.ConversationReadCursor{ + UserID: userID, + MaxReadSeq: readSeq, + }) + } + + result[conversationID] = &pbconversation.ConversationReadCursorList{Cursors: cursors} + } + + return &pbconversation.GetConversationReadCursorsResp{ + Cursors: result, + }, nil +} diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index c52ce9c07..af8c2a146 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -185,8 +185,7 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon } m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID, m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq) - } else if conversation.ConversationType == constant.ReadGroupChatType || - conversation.ConversationType == constant.NotificationChatType { + } else if conversation.ConversationType == constant.NotificationChatType { if req.HasReadSeq > hasReadSeq { err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq) if err != nil { @@ -194,8 +193,19 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon } hasReadSeq = req.HasReadSeq } + m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, seqs, hasReadSeq) + } else if conversation.ConversationType == constant.ReadGroupChatType { + if req.HasReadSeq > hasReadSeq { + err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq) + if err != nil { + return nil, err + } + hasReadSeq = req.HasReadSeq + } + m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.ReadGroupChatType, req.UserID, + conversation.GroupID, seqs, hasReadSeq) } if conversation.ConversationType == constant.SingleChatType {