@ -17,6 +17,8 @@ package conversation
import (
"context"
"errors"
"github.com/OpenIMSDK/protocol/sdkws"
"sort"
"github.com/OpenIMSDK/tools/tx"
@ -41,6 +43,8 @@ import (
)
type conversationServer struct {
msgRpcClient * rpcclient . MessageRpcClient
user * rpcclient . UserRpcClient
groupRpcClient * rpcclient . GroupRpcClient
conversationDatabase controller . ConversationDatabase
conversationNotificationSender * notification . ConversationNotificationSender
@ -61,7 +65,10 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
}
groupRpcClient := rpcclient . NewGroupRpcClient ( client )
msgRpcClient := rpcclient . NewMessageRpcClient ( client )
userRpcClient := rpcclient . NewUserRpcClient ( client )
pbconversation . RegisterConversationServer ( server , & conversationServer {
msgRpcClient : & msgRpcClient ,
user : & userRpcClient ,
conversationNotificationSender : notification . NewConversationNotificationSender ( & msgRpcClient ) ,
groupRpcClient : & groupRpcClient ,
conversationDatabase : controller . NewConversationDatabase ( conversationDB , cache . NewConversationRedis ( rdb , cache . GetDefaultOpt ( ) , conversationDB ) , tx . NewMongo ( mongo . GetClient ( ) ) ) ,
@ -82,6 +89,73 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbconvers
return resp , nil
}
func ( m * conversationServer ) GetConversationList ( ctx context . Context , req * pbconversation . GetConversationListReq ) ( resp * pbconversation . GetConversationListResp , err error ) {
log . ZDebug ( ctx , "GetConversationList" , "seqs" , req , "userID" , req . UserID )
var conversationIDs [ ] string
if len ( req . ConversationIDs ) == 0 {
conversationIDs , err = m . conversationDatabase . GetConversationIDs ( ctx , req . UserID )
if err != nil {
return nil , err
}
} else {
conversationIDs = req . ConversationIDs
}
conversations , err := m . conversationDatabase . FindConversations ( ctx , req . UserID , conversationIDs )
if err != nil {
return nil , err
}
if len ( conversations ) == 0 {
return nil , errs . ErrRecordNotFound . Wrap ( )
}
maxSeqs , err := m . msgRpcClient . GetMaxSeqs ( ctx , conversationIDs )
if err != nil {
return nil , err
}
chatLogs , err := m . msgRpcClient . GetMsgByConversationIDs ( ctx , conversationIDs , maxSeqs )
if err != nil {
return nil , err
}
conversationMsg , err := m . getConversationInfo ( ctx , chatLogs , req . UserID )
if err != nil {
return nil , err
}
hasReadSeqs , err := m . msgRpcClient . GetHasReadSeqs ( ctx , req . UserID , conversationIDs )
if err != nil {
return nil , err
}
conversation_unreadCount := make ( map [ string ] int64 )
for conversationID , maxSeq := range maxSeqs {
conversation_unreadCount [ conversationID ] = maxSeq - hasReadSeqs [ conversationID ]
}
conversation_isPinkTime := make ( map [ int64 ] string )
conversation_notPinkTime := make ( map [ int64 ] string )
for _ , v := range conversations {
conversationID := v . ConversationID
time := conversationMsg [ conversationID ] . MsgInfo . LatestMsgRecvTime
conversationMsg [ conversationID ] . RecvMsgOpt = v . RecvMsgOpt
if v . IsPinned {
conversationMsg [ conversationID ] . IsPinned = v . IsPinned
conversation_isPinkTime [ time ] = conversationID
continue
}
conversation_notPinkTime [ time ] = conversationID
}
resp = & pbconversation . GetConversationListResp {
ConversationElems : [ ] * pbconversation . ConversationElem { } ,
}
m . conversationSort ( conversation_isPinkTime , resp , conversation_unreadCount , conversationMsg )
m . conversationSort ( conversation_notPinkTime , resp , conversation_unreadCount , conversationMsg )
return resp , nil
}
func ( c * conversationServer ) GetAllConversations ( ctx context . Context , req * pbconversation . GetAllConversationsReq ) ( * pbconversation . GetAllConversationsResp , error ) {
conversations , err := c . conversationDatabase . GetUserAllConversation ( ctx , req . OwnerUserID )
if err != nil {
@ -348,3 +422,102 @@ func (c *conversationServer) GetConversationOfflinePushUserIDs(
}
return & pbconversation . GetConversationOfflinePushUserIDsResp { UserIDs : utils . Keys ( userIDSet ) } , nil
}
func ( c * conversationServer ) conversationSort (
conversations map [ int64 ] string ,
resp * pbconversation . GetConversationListResp ,
conversation_unreadCount map [ string ] int64 ,
conversationMsg map [ string ] * pbconversation . ConversationElem ,
) {
keys := [ ] int64 { }
for key := range conversations {
keys = append ( keys , key )
}
sort . Slice ( keys [ : ] , func ( i , j int ) bool {
return keys [ i ] > keys [ j ]
} )
index := 0
cons := make ( [ ] * pbconversation . ConversationElem , len ( conversations ) )
for _ , v := range keys {
conversationID := conversations [ v ]
conversationElem := conversationMsg [ conversationID ]
conversationElem . UnreadCount = conversation_unreadCount [ conversationID ]
cons [ index ] = conversationElem
index ++
}
resp . ConversationElems = append ( resp . ConversationElems , cons ... )
}
func ( c * conversationServer ) getConversationInfo (
ctx context . Context ,
chatLogs map [ string ] * sdkws . MsgData ,
userID string ) ( map [ string ] * pbconversation . ConversationElem , error ) {
var (
sendIDs [ ] string
groupIDs [ ] string
sendMap = make ( map [ string ] * sdkws . UserInfo )
groupMap = make ( map [ string ] * sdkws . GroupInfo )
conversationMsg = make ( map [ string ] * pbconversation . ConversationElem )
)
for _ , chatLog := range chatLogs {
switch chatLog . SessionType {
case constant . SingleChatType :
if chatLog . SendID == userID {
sendIDs = append ( sendIDs , chatLog . RecvID )
}
sendIDs = append ( sendIDs , chatLog . SendID )
case constant . GroupChatType , constant . SuperGroupChatType :
groupIDs = append ( groupIDs , chatLog . GroupID )
sendIDs = append ( sendIDs , chatLog . SendID )
}
}
if len ( sendIDs ) != 0 {
sendInfos , err := c . user . GetUsersInfo ( ctx , sendIDs )
if err != nil {
return nil , err
}
for _ , sendInfo := range sendInfos {
sendMap [ sendInfo . UserID ] = sendInfo
}
}
if len ( groupIDs ) != 0 {
groupInfos , err := c . groupRpcClient . GetGroupInfos ( ctx , groupIDs , false )
if err != nil {
return nil , err
}
for _ , groupInfo := range groupInfos {
groupMap [ groupInfo . GroupID ] = groupInfo
}
}
for conversationID , chatLog := range chatLogs {
pbchatLog := & pbconversation . ConversationElem { }
msgInfo := & pbconversation . MsgInfo { }
if err := utils . CopyStructFields ( msgInfo , chatLog ) ; err != nil {
return nil , err
}
switch chatLog . SessionType {
case constant . SingleChatType :
if chatLog . SendID == userID {
msgInfo . FaceURL = sendMap [ chatLog . RecvID ] . FaceURL
msgInfo . SenderName = sendMap [ chatLog . RecvID ] . Nickname
break
}
msgInfo . FaceURL = sendMap [ chatLog . SendID ] . FaceURL
msgInfo . SenderName = sendMap [ chatLog . SendID ] . Nickname
case constant . GroupChatType , constant . SuperGroupChatType :
msgInfo . GroupName = groupMap [ chatLog . GroupID ] . GroupName
msgInfo . GroupFaceURL = groupMap [ chatLog . GroupID ] . FaceURL
msgInfo . GroupMemberCount = groupMap [ chatLog . GroupID ] . MemberCount
msgInfo . GroupID = chatLog . GroupID
msgInfo . GroupType = groupMap [ chatLog . GroupID ] . GroupType
msgInfo . SenderName = sendMap [ chatLog . SendID ] . Nickname
}
pbchatLog . ConversationID = conversationID
msgInfo . LatestMsgRecvTime = chatLog . SendTime
pbchatLog . MsgInfo = msgInfo
conversationMsg [ conversationID ] = pbchatLog
}
return conversationMsg , nil
}