From 31f5a4e51c1be2b1d356774107597c9f5b23ac90 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 25 Sep 2024 18:51:05 +0800 Subject: [PATCH] feat: jssdk GetConversations, GetActiveConversation --- internal/api/{ => jssdk}/jssdk.go | 140 +++++++++++---------------- internal/api/jssdk/sort.go | 33 +++++++ internal/api/jssdk/stu.go | 22 +++++ internal/api/jssdk/tools.go | 26 +++++ internal/api/router.go | 7 +- pkg/common/storage/controller/msg.go | 1 + 6 files changed, 145 insertions(+), 84 deletions(-) rename internal/api/{ => jssdk}/jssdk.go (62%) create mode 100644 internal/api/jssdk/sort.go create mode 100644 internal/api/jssdk/stu.go create mode 100644 internal/api/jssdk/tools.go diff --git a/internal/api/jssdk.go b/internal/api/jssdk/jssdk.go similarity index 62% rename from internal/api/jssdk.go rename to internal/api/jssdk/jssdk.go index adb38778d..7f136c74c 100644 --- a/internal/api/jssdk.go +++ b/internal/api/jssdk/jssdk.go @@ -1,20 +1,20 @@ -package api +package jssdk import ( - "context" "github.com/gin-gonic/gin" "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/a2r" - "github.com/openimsdk/tools/apiresp" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" - "google.golang.org/grpc" "sort" ) -const limitGetActiveConversation = 100 +const ( + maxGetActiveConversation = 500 + defaultGetActiveConversation = 100 +) func NewJSSdkApi(msg msg.MsgClient, conv conversation.ConversationClient) *JSSdk { return &JSSdk{ @@ -28,15 +28,22 @@ type JSSdk struct { conv conversation.ConversationClient } -func (x *JSSdk) GetActiveConversation(c *gin.Context) { - call(c, x.getActiveConversation) +func (x *JSSdk) GetActiveConversations(c *gin.Context) { + call(c, x.getActiveConversations) } func (x *JSSdk) GetConversations(c *gin.Context) { call(c, x.getConversations) } -func (x *JSSdk) getActiveConversation(ctx *gin.Context) ([]ConversationMsg, error) { +func (x *JSSdk) getActiveConversations(ctx *gin.Context) (*ConversationsResp, error) { + req, err := a2r.ParseRequest[ActiveConversationsReq](ctx) + if err != nil { + return nil, err + } + if req.Count <= 0 || req.Count > maxGetActiveConversation { + req.Count = defaultGetActiveConversation + } opUserID := mcontext.GetOpUserID(ctx) conversationIDs, err := field(ctx, x.conv.GetConversationIDs, &conversation.GetConversationIDsReq{UserID: opUserID}, (*conversation.GetConversationIDsResp).GetConversationIDs) @@ -44,7 +51,12 @@ func (x *JSSdk) getActiveConversation(ctx *gin.Context) ([]ConversationMsg, erro return nil, err } if len(conversationIDs) == 0 { - return nil, nil + return &ConversationsResp{}, nil + } + readSeq, err := field(ctx, x.msg.GetHasReadSeqs, + &msg.GetHasReadSeqsReq{UserID: opUserID, ConversationIDs: conversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs) + if err != nil { + return nil, err } activeConversation, err := field(ctx, x.msg.GetActiveConversation, &msg.GetActiveConversationReq{ConversationIDs: conversationIDs}, (*msg.GetActiveConversationResp).GetConversations) @@ -52,7 +64,7 @@ func (x *JSSdk) getActiveConversation(ctx *gin.Context) ([]ConversationMsg, erro return nil, err } if len(activeConversation) == 0 { - return nil, nil + return &ConversationsResp{}, nil } sortConversations := sortActiveConversations{ Conversation: activeConversation, @@ -66,7 +78,7 @@ func (x *JSSdk) getActiveConversation(ctx *gin.Context) ([]ConversationMsg, erro sortConversations.PinnedConversationIDs = datautil.SliceSet(pinnedConversationIDs) } sort.Sort(&sortConversations) - sortList := sortConversations.Top(limitGetActiveConversation) + sortList := sortConversations.Top(req.Count) conversations, err := field(ctx, x.conv.GetConversations, &conversation.GetConversationsReq{ OwnerUserID: opUserID, @@ -76,11 +88,6 @@ func (x *JSSdk) getActiveConversation(ctx *gin.Context) ([]ConversationMsg, erro if err != nil { return nil, err } - //readSeq, err := field(ctx, x.msg.GetHasReadSeqs, - // &msg.GetHasReadSeqsReq{UserID: opUserID, ConversationIDs: conversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs) - //if err != nil { - // return nil, err - //} msgs, err := field(ctx, x.msg.GetSeqMessage, &msg.GetSeqMessageReq{ UserID: opUserID, @@ -110,15 +117,24 @@ func (x *JSSdk) getActiveConversation(ctx *gin.Context) ([]ConversationMsg, erro resp = append(resp, ConversationMsg{ Conversation: conv, LastMsg: lastMsg, - //MaxSeq: c.MaxSeq, - //MaxSeqTime: c.LastTime, - //ReadSeq: readSeq[c.ConversationID], + MaxSeq: c.MaxSeq, + ReadSeq: readSeq[c.ConversationID], }) } - return resp, nil + var unreadCount int64 + for _, c := range activeConversation { + count := c.MaxSeq - readSeq[c.ConversationID] + if count > 0 { + unreadCount += count + } + } + return &ConversationsResp{ + Conversations: resp, + UnreadCount: unreadCount, + }, nil } -func (x *JSSdk) getConversations(ctx *gin.Context) ([]ConversationMsg, error) { +func (x *JSSdk) getConversations(ctx *gin.Context) (*ConversationsResp, error) { req, err := a2r.ParseRequest[conversation.GetConversationsReq](ctx) if err != nil { return nil, err @@ -129,12 +145,18 @@ func (x *JSSdk) getConversations(ctx *gin.Context) ([]ConversationMsg, error) { return nil, err } if len(conversations) == 0 { - return nil, nil + return &ConversationsResp{}, nil } + req.ConversationIDs = datautil.Slice(conversations, func(c *conversation.Conversation) string { + return c.ConversationID + }) maxSeqs, err := field(ctx, x.msg.GetMaxSeqs, - &msg.GetMaxSeqsReq{ConversationIDs: datautil.Slice(conversations, func(c *conversation.Conversation) string { - return c.ConversationID - })}, (*msg.SeqsInfoResp).GetMaxSeqs) + &msg.GetMaxSeqsReq{ConversationIDs: req.ConversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs) + if err != nil { + return nil, err + } + readSeqs, err := field(ctx, x.msg.GetHasReadSeqs, + &msg.GetHasReadSeqsReq{UserID: req.OwnerUserID, ConversationIDs: req.ConversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs) if err != nil { return nil, err } @@ -164,63 +186,19 @@ func (x *JSSdk) getConversations(ctx *gin.Context) ([]ConversationMsg, error) { resp = append(resp, ConversationMsg{ Conversation: c, LastMsg: lastMsg, + MaxSeq: maxSeqs[c.ConversationID], + ReadSeq: readSeqs[c.ConversationID], }) } - return resp, nil -} - -type ConversationMsg struct { - Conversation *conversation.Conversation `json:"conversation"` - LastMsg *sdkws.MsgData `json:"lastMsg"` - //ReadSeq int64 `json:"readSeq"` - //MaxSeq int64 `json:"maxSeq"` - //MaxSeqTime int64 `json:"maxSeqTime"` -} - -type sortActiveConversations struct { - Conversation []*msg.ActiveConversation - PinnedConversationIDs map[string]struct{} -} - -func (s sortActiveConversations) Top(limit int) []*msg.ActiveConversation { - if limit > 0 && len(s.Conversation) > limit { - return s.Conversation[:limit] - } - return s.Conversation -} - -func (s sortActiveConversations) Len() int { - return len(s.Conversation) -} - -func (s sortActiveConversations) Less(i, j int) bool { - iv, jv := s.Conversation[i], s.Conversation[j] - _, ip := s.PinnedConversationIDs[iv.ConversationID] - _, jp := s.PinnedConversationIDs[jv.ConversationID] - if ip != jp { - return ip - } - return iv.LastTime > jv.LastTime -} - -func (s sortActiveConversations) Swap(i, j int) { - s.Conversation[i], s.Conversation[j] = s.Conversation[j], s.Conversation[i] -} - -func field[A, B, C any](ctx context.Context, fn func(ctx context.Context, req *A, opts ...grpc.CallOption) (*B, error), req *A, get func(*B) C) (C, error) { - resp, err := fn(ctx, req) - if err != nil { - var c C - return c, err - } - return get(resp), nil -} - -func call[R any](c *gin.Context, fn func(ctx *gin.Context) (R, error)) { - resp, err := fn(c) - if err != nil { - apiresp.GinError(c, err) - return + var unreadCount int64 + for conversationID, maxSeq := range maxSeqs { + count := maxSeq - readSeqs[conversationID] + if count > 0 { + unreadCount += count + } } - apiresp.GinSuccess(c, resp) + return &ConversationsResp{ + Conversations: resp, + UnreadCount: unreadCount, + }, nil } diff --git a/internal/api/jssdk/sort.go b/internal/api/jssdk/sort.go new file mode 100644 index 000000000..f5fd04148 --- /dev/null +++ b/internal/api/jssdk/sort.go @@ -0,0 +1,33 @@ +package jssdk + +import "github.com/openimsdk/protocol/msg" + +type sortActiveConversations struct { + Conversation []*msg.ActiveConversation + PinnedConversationIDs map[string]struct{} +} + +func (s sortActiveConversations) Top(limit int) []*msg.ActiveConversation { + if limit > 0 && len(s.Conversation) > limit { + return s.Conversation[:limit] + } + return s.Conversation +} + +func (s sortActiveConversations) Len() int { + return len(s.Conversation) +} + +func (s sortActiveConversations) Less(i, j int) bool { + iv, jv := s.Conversation[i], s.Conversation[j] + _, ip := s.PinnedConversationIDs[iv.ConversationID] + _, jp := s.PinnedConversationIDs[jv.ConversationID] + if ip != jp { + return ip + } + return iv.LastTime > jv.LastTime +} + +func (s sortActiveConversations) Swap(i, j int) { + s.Conversation[i], s.Conversation[j] = s.Conversation[j], s.Conversation[i] +} diff --git a/internal/api/jssdk/stu.go b/internal/api/jssdk/stu.go new file mode 100644 index 000000000..2f63975b3 --- /dev/null +++ b/internal/api/jssdk/stu.go @@ -0,0 +1,22 @@ +package jssdk + +import ( + "github.com/openimsdk/protocol/conversation" + "github.com/openimsdk/protocol/sdkws" +) + +type ActiveConversationsReq struct { + Count int `json:"count"` +} + +type ConversationMsg struct { + Conversation *conversation.Conversation `json:"conversation"` + LastMsg *sdkws.MsgData `json:"lastMsg"` + MaxSeq int64 `json:"maxSeq"` + ReadSeq int64 `json:"readSeq"` +} + +type ConversationsResp struct { + UnreadCount int64 `json:"unreadCount"` + Conversations []ConversationMsg `json:"conversations"` +} diff --git a/internal/api/jssdk/tools.go b/internal/api/jssdk/tools.go new file mode 100644 index 000000000..c57457d9f --- /dev/null +++ b/internal/api/jssdk/tools.go @@ -0,0 +1,26 @@ +package jssdk + +import ( + "context" + "github.com/gin-gonic/gin" + "github.com/openimsdk/tools/apiresp" + "google.golang.org/grpc" +) + +func field[A, B, C any](ctx context.Context, fn func(ctx context.Context, req *A, opts ...grpc.CallOption) (*B, error), req *A, get func(*B) C) (C, error) { + resp, err := fn(ctx, req) + if err != nil { + var c C + return c, err + } + return get(resp), nil +} + +func call[R any](c *gin.Context, fn func(ctx *gin.Context) (R, error)) { + resp, err := fn(c) + if err != nil { + apiresp.GinError(c, err) + return + } + apiresp.GinSuccess(c, resp) +} diff --git a/internal/api/router.go b/internal/api/router.go index 967814ef3..adae5f806 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -2,6 +2,7 @@ package api import ( "fmt" + "github.com/openimsdk/open-im-server/v3/internal/api/jssdk" "github.com/gin-contrib/gzip" @@ -75,7 +76,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En r.Use(prommetricsGin(), gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(authRpc)) u := NewUserApi(*userRpc) m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID) - j := NewJSSdkApi(messageRpc.Client, conversationRpc.Client) + j := jssdk.NewJSSdkApi(messageRpc.Client, conversationRpc.Client) userRouterGroup := r.Group("/user") { userRouterGroup.POST("/user_register", u.UserRegister) @@ -246,8 +247,8 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En } jssdk := r.Group("/jssdk") - jssdk.POST("/get_conversation", j.GetConversations) - jssdk.POST("/get_active_conversation", j.GetActiveConversation) + jssdk.POST("/get_conversations", j.GetConversations) + jssdk.POST("/get_active_conversations", j.GetActiveConversations) return r } diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index e98e139c4..d579069b6 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -880,5 +880,6 @@ func (db *commonMsgDatabase) GetMaxSeqWithTime(ctx context.Context, conversation } func (db *commonMsgDatabase) GetMaxSeqsWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) { + // todo: only the time in the redis cache will be taken, not the message time return db.seqConversation.GetMaxSeqsWithTime(ctx, conversationIDs) }