|
|
|
@ -12,7 +12,6 @@ import (
|
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
|
|
|
|
|
pbConversation "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation"
|
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
|
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
|
|
|
@ -170,15 +169,20 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, con
|
|
|
|
|
currentMaxSeq, err = och.msgDatabase.GetGroupMaxSeq(ctx, conversationID)
|
|
|
|
|
if err == redis.Nil {
|
|
|
|
|
log.ZInfo(ctx, "group chat first create conversation", "conversationID", conversationID)
|
|
|
|
|
if err := och.GroupChatFirstCreateConversation(ctx, storageList[0]); err != nil {
|
|
|
|
|
log.ZError(ctx, "single chat first create conversation error", err, "conversationID", conversationID)
|
|
|
|
|
userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, storageList[0].GroupID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ZError(ctx, "get group member ids error", err, "conversationID", conversationID)
|
|
|
|
|
} else {
|
|
|
|
|
if err := och.conversationRpcClient.GroupChatFirstCreateConversation(ctx, storageList[0].GroupID, userIDs); err != nil {
|
|
|
|
|
log.ZError(ctx, "single chat first create conversation error", err, "conversationID", conversationID)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
currentMaxSeq, err = och.msgDatabase.GetUserMaxSeq(ctx, conversationID)
|
|
|
|
|
if err == redis.Nil {
|
|
|
|
|
log.ZInfo(ctx, "single chat first create conversation", "conversationID", conversationID)
|
|
|
|
|
if err := och.SingleChatFirstCreateConversation(ctx, storageList[0]); err != nil {
|
|
|
|
|
if err := och.conversationRpcClient.SingleChatFirstCreateConversation(ctx, storageList[0].RecvID, storageList[0].SendID); err != nil {
|
|
|
|
|
log.ZError(ctx, "single chat first create conversation error", err, "conversationID", conversationID)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -206,35 +210,6 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, con
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) SingleChatFirstCreateConversation(ctx context.Context, msg *sdkws.MsgData) error {
|
|
|
|
|
conversation := new(pbConversation.Conversation)
|
|
|
|
|
conversationID := utils.GetConversationIDBySessionType(constant.SingleChatType, msg.RecvID, msg.SendID)
|
|
|
|
|
conversation.ConversationType = constant.SingleChatType
|
|
|
|
|
conversation2 := proto.Clone(conversation).(*pbConversation.Conversation)
|
|
|
|
|
conversation.OwnerUserID = msg.SendID
|
|
|
|
|
conversation.UserID = msg.RecvID
|
|
|
|
|
conversation.ConversationID = conversationID
|
|
|
|
|
conversation2.OwnerUserID = msg.RecvID
|
|
|
|
|
conversation2.UserID = msg.SendID
|
|
|
|
|
conversation2.ConversationID = conversationID
|
|
|
|
|
log.ZDebug(ctx, "create single conversation", "conversation", conversation, "conversation2", conversation2)
|
|
|
|
|
return och.conversationRpcClient.CreateConversationsWithoutNotification(ctx, []*pbConversation.Conversation{conversation, conversation2})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) GroupChatFirstCreateConversation(ctx context.Context, msg *sdkws.MsgData) error {
|
|
|
|
|
userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, msg.GroupID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
var conversations []*pbConversation.Conversation
|
|
|
|
|
for _, v := range userIDs {
|
|
|
|
|
conversation := pbConversation.Conversation{ConversationType: constant.SuperGroupChatType, GroupID: msg.GroupID, OwnerUserID: v, ConversationID: utils.GetConversationIDBySessionType(constant.SuperGroupChatType, msg.GroupID)}
|
|
|
|
|
conversations = append(conversations, &conversation)
|
|
|
|
|
}
|
|
|
|
|
log.ZDebug(ctx, "create group conversation", "conversations", conversations)
|
|
|
|
|
return och.conversationRpcClient.CreateConversationsWithoutNotification(ctx, conversations)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
|
|
|
|
|
for {
|
|
|
|
|
aggregationMsgs := make(map[string][]*ContextMsg, ChannelNum)
|
|
|
|
|