feat: jssdk GetConversations, GetActiveConversation

pull/2664/head
withchao 1 year ago
parent 95a1a9bb82
commit 31f5a4e51c

@ -1,20 +1,20 @@
package api package jssdk
import ( import (
"context"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/a2r" "github.com/openimsdk/tools/a2r"
"github.com/openimsdk/tools/apiresp"
"github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"google.golang.org/grpc"
"sort" "sort"
) )
const limitGetActiveConversation = 100 const (
maxGetActiveConversation = 500
defaultGetActiveConversation = 100
)
func NewJSSdkApi(msg msg.MsgClient, conv conversation.ConversationClient) *JSSdk { func NewJSSdkApi(msg msg.MsgClient, conv conversation.ConversationClient) *JSSdk {
return &JSSdk{ return &JSSdk{
@ -28,15 +28,22 @@ type JSSdk struct {
conv conversation.ConversationClient conv conversation.ConversationClient
} }
func (x *JSSdk) GetActiveConversation(c *gin.Context) { func (x *JSSdk) GetActiveConversations(c *gin.Context) {
call(c, x.getActiveConversation) call(c, x.getActiveConversations)
} }
func (x *JSSdk) GetConversations(c *gin.Context) { func (x *JSSdk) GetConversations(c *gin.Context) {
call(c, x.getConversations) call(c, x.getConversations)
} }
func (x *JSSdk) getActiveConversation(ctx *gin.Context) ([]ConversationMsg, error) { func (x *JSSdk) getActiveConversations(ctx *gin.Context) (*ConversationsResp, error) {
req, err := a2r.ParseRequest[ActiveConversationsReq](ctx)
if err != nil {
return nil, err
}
if req.Count <= 0 || req.Count > maxGetActiveConversation {
req.Count = defaultGetActiveConversation
}
opUserID := mcontext.GetOpUserID(ctx) opUserID := mcontext.GetOpUserID(ctx)
conversationIDs, err := field(ctx, x.conv.GetConversationIDs, conversationIDs, err := field(ctx, x.conv.GetConversationIDs,
&conversation.GetConversationIDsReq{UserID: opUserID}, (*conversation.GetConversationIDsResp).GetConversationIDs) &conversation.GetConversationIDsReq{UserID: opUserID}, (*conversation.GetConversationIDsResp).GetConversationIDs)
@ -44,7 +51,12 @@ func (x *JSSdk) getActiveConversation(ctx *gin.Context) ([]ConversationMsg, erro
return nil, err return nil, err
} }
if len(conversationIDs) == 0 { if len(conversationIDs) == 0 {
return nil, nil return &ConversationsResp{}, nil
}
readSeq, err := field(ctx, x.msg.GetHasReadSeqs,
&msg.GetHasReadSeqsReq{UserID: opUserID, ConversationIDs: conversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs)
if err != nil {
return nil, err
} }
activeConversation, err := field(ctx, x.msg.GetActiveConversation, activeConversation, err := field(ctx, x.msg.GetActiveConversation,
&msg.GetActiveConversationReq{ConversationIDs: conversationIDs}, (*msg.GetActiveConversationResp).GetConversations) &msg.GetActiveConversationReq{ConversationIDs: conversationIDs}, (*msg.GetActiveConversationResp).GetConversations)
@ -52,7 +64,7 @@ func (x *JSSdk) getActiveConversation(ctx *gin.Context) ([]ConversationMsg, erro
return nil, err return nil, err
} }
if len(activeConversation) == 0 { if len(activeConversation) == 0 {
return nil, nil return &ConversationsResp{}, nil
} }
sortConversations := sortActiveConversations{ sortConversations := sortActiveConversations{
Conversation: activeConversation, Conversation: activeConversation,
@ -66,7 +78,7 @@ func (x *JSSdk) getActiveConversation(ctx *gin.Context) ([]ConversationMsg, erro
sortConversations.PinnedConversationIDs = datautil.SliceSet(pinnedConversationIDs) sortConversations.PinnedConversationIDs = datautil.SliceSet(pinnedConversationIDs)
} }
sort.Sort(&sortConversations) sort.Sort(&sortConversations)
sortList := sortConversations.Top(limitGetActiveConversation) sortList := sortConversations.Top(req.Count)
conversations, err := field(ctx, x.conv.GetConversations, conversations, err := field(ctx, x.conv.GetConversations,
&conversation.GetConversationsReq{ &conversation.GetConversationsReq{
OwnerUserID: opUserID, OwnerUserID: opUserID,
@ -76,11 +88,6 @@ func (x *JSSdk) getActiveConversation(ctx *gin.Context) ([]ConversationMsg, erro
if err != nil { if err != nil {
return nil, err return nil, err
} }
//readSeq, err := field(ctx, x.msg.GetHasReadSeqs,
// &msg.GetHasReadSeqsReq{UserID: opUserID, ConversationIDs: conversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs)
//if err != nil {
// return nil, err
//}
msgs, err := field(ctx, x.msg.GetSeqMessage, msgs, err := field(ctx, x.msg.GetSeqMessage,
&msg.GetSeqMessageReq{ &msg.GetSeqMessageReq{
UserID: opUserID, UserID: opUserID,
@ -110,15 +117,24 @@ func (x *JSSdk) getActiveConversation(ctx *gin.Context) ([]ConversationMsg, erro
resp = append(resp, ConversationMsg{ resp = append(resp, ConversationMsg{
Conversation: conv, Conversation: conv,
LastMsg: lastMsg, LastMsg: lastMsg,
//MaxSeq: c.MaxSeq, MaxSeq: c.MaxSeq,
//MaxSeqTime: c.LastTime, ReadSeq: readSeq[c.ConversationID],
//ReadSeq: readSeq[c.ConversationID],
}) })
} }
return resp, nil var unreadCount int64
for _, c := range activeConversation {
count := c.MaxSeq - readSeq[c.ConversationID]
if count > 0 {
unreadCount += count
}
}
return &ConversationsResp{
Conversations: resp,
UnreadCount: unreadCount,
}, nil
} }
func (x *JSSdk) getConversations(ctx *gin.Context) ([]ConversationMsg, error) { func (x *JSSdk) getConversations(ctx *gin.Context) (*ConversationsResp, error) {
req, err := a2r.ParseRequest[conversation.GetConversationsReq](ctx) req, err := a2r.ParseRequest[conversation.GetConversationsReq](ctx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -129,12 +145,18 @@ func (x *JSSdk) getConversations(ctx *gin.Context) ([]ConversationMsg, error) {
return nil, err return nil, err
} }
if len(conversations) == 0 { if len(conversations) == 0 {
return nil, nil return &ConversationsResp{}, nil
} }
req.ConversationIDs = datautil.Slice(conversations, func(c *conversation.Conversation) string {
return c.ConversationID
})
maxSeqs, err := field(ctx, x.msg.GetMaxSeqs, maxSeqs, err := field(ctx, x.msg.GetMaxSeqs,
&msg.GetMaxSeqsReq{ConversationIDs: datautil.Slice(conversations, func(c *conversation.Conversation) string { &msg.GetMaxSeqsReq{ConversationIDs: req.ConversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs)
return c.ConversationID if err != nil {
})}, (*msg.SeqsInfoResp).GetMaxSeqs) return nil, err
}
readSeqs, err := field(ctx, x.msg.GetHasReadSeqs,
&msg.GetHasReadSeqsReq{UserID: req.OwnerUserID, ConversationIDs: req.ConversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -164,63 +186,19 @@ func (x *JSSdk) getConversations(ctx *gin.Context) ([]ConversationMsg, error) {
resp = append(resp, ConversationMsg{ resp = append(resp, ConversationMsg{
Conversation: c, Conversation: c,
LastMsg: lastMsg, LastMsg: lastMsg,
MaxSeq: maxSeqs[c.ConversationID],
ReadSeq: readSeqs[c.ConversationID],
}) })
} }
return resp, nil var unreadCount int64
} for conversationID, maxSeq := range maxSeqs {
count := maxSeq - readSeqs[conversationID]
type ConversationMsg struct { if count > 0 {
Conversation *conversation.Conversation `json:"conversation"` unreadCount += count
LastMsg *sdkws.MsgData `json:"lastMsg"` }
//ReadSeq int64 `json:"readSeq"`
//MaxSeq int64 `json:"maxSeq"`
//MaxSeqTime int64 `json:"maxSeqTime"`
}
type sortActiveConversations struct {
Conversation []*msg.ActiveConversation
PinnedConversationIDs map[string]struct{}
}
func (s sortActiveConversations) Top(limit int) []*msg.ActiveConversation {
if limit > 0 && len(s.Conversation) > limit {
return s.Conversation[:limit]
}
return s.Conversation
}
func (s sortActiveConversations) Len() int {
return len(s.Conversation)
}
func (s sortActiveConversations) Less(i, j int) bool {
iv, jv := s.Conversation[i], s.Conversation[j]
_, ip := s.PinnedConversationIDs[iv.ConversationID]
_, jp := s.PinnedConversationIDs[jv.ConversationID]
if ip != jp {
return ip
}
return iv.LastTime > jv.LastTime
}
func (s sortActiveConversations) Swap(i, j int) {
s.Conversation[i], s.Conversation[j] = s.Conversation[j], s.Conversation[i]
}
func field[A, B, C any](ctx context.Context, fn func(ctx context.Context, req *A, opts ...grpc.CallOption) (*B, error), req *A, get func(*B) C) (C, error) {
resp, err := fn(ctx, req)
if err != nil {
var c C
return c, err
}
return get(resp), nil
}
func call[R any](c *gin.Context, fn func(ctx *gin.Context) (R, error)) {
resp, err := fn(c)
if err != nil {
apiresp.GinError(c, err)
return
} }
apiresp.GinSuccess(c, resp) return &ConversationsResp{
Conversations: resp,
UnreadCount: unreadCount,
}, nil
} }

@ -0,0 +1,33 @@
package jssdk
import "github.com/openimsdk/protocol/msg"
type sortActiveConversations struct {
Conversation []*msg.ActiveConversation
PinnedConversationIDs map[string]struct{}
}
func (s sortActiveConversations) Top(limit int) []*msg.ActiveConversation {
if limit > 0 && len(s.Conversation) > limit {
return s.Conversation[:limit]
}
return s.Conversation
}
func (s sortActiveConversations) Len() int {
return len(s.Conversation)
}
func (s sortActiveConversations) Less(i, j int) bool {
iv, jv := s.Conversation[i], s.Conversation[j]
_, ip := s.PinnedConversationIDs[iv.ConversationID]
_, jp := s.PinnedConversationIDs[jv.ConversationID]
if ip != jp {
return ip
}
return iv.LastTime > jv.LastTime
}
func (s sortActiveConversations) Swap(i, j int) {
s.Conversation[i], s.Conversation[j] = s.Conversation[j], s.Conversation[i]
}

@ -0,0 +1,22 @@
package jssdk
import (
"github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/sdkws"
)
type ActiveConversationsReq struct {
Count int `json:"count"`
}
type ConversationMsg struct {
Conversation *conversation.Conversation `json:"conversation"`
LastMsg *sdkws.MsgData `json:"lastMsg"`
MaxSeq int64 `json:"maxSeq"`
ReadSeq int64 `json:"readSeq"`
}
type ConversationsResp struct {
UnreadCount int64 `json:"unreadCount"`
Conversations []ConversationMsg `json:"conversations"`
}

@ -0,0 +1,26 @@
package jssdk
import (
"context"
"github.com/gin-gonic/gin"
"github.com/openimsdk/tools/apiresp"
"google.golang.org/grpc"
)
func field[A, B, C any](ctx context.Context, fn func(ctx context.Context, req *A, opts ...grpc.CallOption) (*B, error), req *A, get func(*B) C) (C, error) {
resp, err := fn(ctx, req)
if err != nil {
var c C
return c, err
}
return get(resp), nil
}
func call[R any](c *gin.Context, fn func(ctx *gin.Context) (R, error)) {
resp, err := fn(c)
if err != nil {
apiresp.GinError(c, err)
return
}
apiresp.GinSuccess(c, resp)
}

@ -2,6 +2,7 @@ package api
import ( import (
"fmt" "fmt"
"github.com/openimsdk/open-im-server/v3/internal/api/jssdk"
"github.com/gin-contrib/gzip" "github.com/gin-contrib/gzip"
@ -75,7 +76,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
r.Use(prommetricsGin(), gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(authRpc)) r.Use(prommetricsGin(), gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(authRpc))
u := NewUserApi(*userRpc) u := NewUserApi(*userRpc)
m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID) m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID)
j := NewJSSdkApi(messageRpc.Client, conversationRpc.Client) j := jssdk.NewJSSdkApi(messageRpc.Client, conversationRpc.Client)
userRouterGroup := r.Group("/user") userRouterGroup := r.Group("/user")
{ {
userRouterGroup.POST("/user_register", u.UserRegister) userRouterGroup.POST("/user_register", u.UserRegister)
@ -246,8 +247,8 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
} }
jssdk := r.Group("/jssdk") jssdk := r.Group("/jssdk")
jssdk.POST("/get_conversation", j.GetConversations) jssdk.POST("/get_conversations", j.GetConversations)
jssdk.POST("/get_active_conversation", j.GetActiveConversation) jssdk.POST("/get_active_conversations", j.GetActiveConversations)
return r return r
} }

@ -880,5 +880,6 @@ func (db *commonMsgDatabase) GetMaxSeqWithTime(ctx context.Context, conversation
} }
func (db *commonMsgDatabase) GetMaxSeqsWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) { func (db *commonMsgDatabase) GetMaxSeqsWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) {
// todo: only the time in the redis cache will be taken, not the message time
return db.seqConversation.GetMaxSeqsWithTime(ctx, conversationIDs) return db.seqConversation.GetMaxSeqsWithTime(ctx, conversationIDs)
} }

Loading…
Cancel
Save