From 39266f3db63fd87337755c1eb41c59ca57f6fbd9 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Mon, 11 Sep 2023 14:36:47 +0800 Subject: [PATCH] conversation --- go.mod | 2 +- go.sum | 4 +-- internal/push/push_to_client.go | 19 ++++++++----- internal/rpc/conversation/conversaion.go | 29 ++++++++++++++------ pkg/common/db/cache/conversation.go | 27 ++++++++++++++++++ pkg/common/db/controller/conversation.go | 21 ++++++++++---- pkg/common/db/relation/conversation_model.go | 18 +++++++++--- pkg/common/db/table/relation/conversation.go | 2 +- 8 files changed, 93 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index 262c2e82e..1e5256b50 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require github.com/google/uuid v1.3.1 require ( github.com/IBM/sarama v1.41.1 - github.com/OpenIMSDK/protocol v0.0.19 + github.com/OpenIMSDK/protocol v0.0.20 github.com/OpenIMSDK/tools v0.0.14 github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible github.com/go-redis/redis v6.15.9+incompatible diff --git a/go.sum b/go.sum index ee2845684..fe4a784d8 100644 --- a/go.sum +++ b/go.sum @@ -19,8 +19,8 @@ firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIw github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/IBM/sarama v1.41.1 h1:B4/TdHce/8Ipza+qrLIeNJ9D1AOxZVp/3uDv6H/dp2M= github.com/IBM/sarama v1.41.1/go.mod h1:JFCPURVskaipJdKRFkiE/OZqQHw7jqliaJmRwXCmSSw= -github.com/OpenIMSDK/protocol v0.0.19 h1:uhIToUMv6AZK0g5k4uuSEMRwuRdeLSUmM4yMG0Hw5Rk= -github.com/OpenIMSDK/protocol v0.0.19/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= +github.com/OpenIMSDK/protocol v0.0.20 h1:x6tAbBTBiaFdvT98nM6IRnEIHsVr0Weh/fAaTLBrN64= +github.com/OpenIMSDK/protocol v0.0.20/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/OpenIMSDK/tools v0.0.14 h1:WLof/+WxyPyRST+QkoTKubYCiV73uCLiL8pgnpH/yKQ= github.com/OpenIMSDK/tools v0.0.14/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 5c40e7dfb..7ebf47db9 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -235,20 +235,25 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws if len(offlinePushUserIDs) > 0 { needOfflinePushUserIDs = offlinePushUserIDs } - resp, err := p.conversationRpcClient.Client.GetConversationNeedOfflinePushUserIDs( - ctx, - &conversation.GetConversationNeedOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), OwnerUserIDs: needOfflinePushUserIDs}, - ) + resp, err := p.conversationRpcClient.Client.GetConversationNotOfflinePushUserIDs(ctx, &conversation.GetConversationNotOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}) if err != nil { return err } - if len(resp.UserIDs) > 0 { - err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs) + userIDSet := make(map[string]struct{}) + for _, userID := range needOfflinePushUserIDs { + userIDSet[userID] = struct{}{} + } + for _, userID := range resp.UserIDs { + delete(userIDSet, userID) + } + if len(userIDSet) > 0 { + userIDs := utils.Keys(userIDSet) + err = p.offlinePushMsg(ctx, groupID, msg, userIDs) if err != nil { log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) return err } - if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, WebAndPcBackgroundUserIDs)); err != nil { + if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(userIDs, WebAndPcBackgroundUserIDs)); err != nil { log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs)) return err } diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index acd996ed6..956974fca 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -301,19 +301,32 @@ func (c *conversationServer) GetConversationsByConversationID( return &pbconversation.GetConversationsByConversationIDResp{Conversations: convert.ConversationsDB2Pb(conversations)}, nil } -func (c *conversationServer) GetConversationNeedOfflinePushUserIDs( +func (c *conversationServer) GetConversationNotOfflinePushUserIDs( ctx context.Context, - req *pbconversation.GetConversationNeedOfflinePushUserIDsReq, -) (*pbconversation.GetConversationNeedOfflinePushUserIDsResp, error) { + req *pbconversation.GetConversationNotOfflinePushUserIDsReq, +) (*pbconversation.GetConversationNotOfflinePushUserIDsResp, error) { if req.ConversationID == "" { return nil, errs.ErrArgs.Wrap("conversationID is empty") } - if len(req.OwnerUserIDs) == 0 { - return &pbconversation.GetConversationNeedOfflinePushUserIDsResp{}, nil - } - userIDs, err := c.conversationDatabase.GetConversationNeedOfflinePushUserIDs(ctx, req.ConversationID, req.OwnerUserIDs) + userIDs, err := c.conversationDatabase.GetConversationNotReceiveMessageUserIDs(ctx, req.ConversationID) if err != nil { return nil, err } - return &pbconversation.GetConversationNeedOfflinePushUserIDsResp{UserIDs: userIDs}, nil + if len(req.UserIDs) == 0 { + return &pbconversation.GetConversationNotOfflinePushUserIDsResp{UserIDs: userIDs}, nil + } + if len(userIDs) == 0 { + return &pbconversation.GetConversationNotOfflinePushUserIDsResp{}, nil + } + userIDSet := make(map[string]struct{}) + for _, userID := range userIDs { + userIDSet[userID] = struct{}{} + } + resp := &pbconversation.GetConversationNotOfflinePushUserIDsResp{UserIDs: make([]string, 0, len(req.UserIDs))} + for _, userID := range req.UserIDs { + if _, ok := userIDSet[userID]; !ok { + resp.UserIDs = append(resp.UserIDs, userID) + } + } + return resp, nil } diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index d02b021e3..083cf6d0b 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -38,6 +38,7 @@ const ( recvMsgOptKey = "RECV_MSG_OPT:" superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:" + conversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:" conversationExpireTime = time.Second * 60 * 60 * 12 ) @@ -83,6 +84,8 @@ type ConversationCache interface { conversationIDs []string, ) ([]*relationtb.ConversationModel, error) DelConversationByConversationID(conversationIDs ...string) ConversationCache + GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) + DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache } func NewConversationRedis( @@ -153,6 +156,10 @@ func (c *ConversationRedisCache) getConversationHasReadSeqKey(ownerUserID, conve return conversationHasReadSeqKey + ownerUserID + ":" + conversationID } +func (c *ConversationRedisCache) getConversationNotReceiveMessageUserIDsKey(conversationID string) string { + return conversationNotReceiveMessageUserIDsKey + conversationID +} + func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { return getCache( ctx, @@ -432,3 +439,23 @@ func (c *ConversationRedisCache) GetConversationsByConversationID( func (c *ConversationRedisCache) DelConversationByConversationID(conversationIDs ...string) ConversationCache { panic("implement me") } + +func (c *ConversationRedisCache) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { + return getCache( + ctx, + c.rcClient, + c.getConversationNotReceiveMessageUserIDsKey(conversationID), + c.expireTime, + func(ctx context.Context) ([]string, error) { + return c.conversationDB.GetConversationNotReceiveMessageUserIDs(ctx, conversationID) + }, + ) +} + +func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache { + cache := c.NewCache() + for _, conversationID := range conversationIDs { + cache.AddKeys(c.getConversationNotReceiveMessageUserIDsKey(conversationID)) + } + return cache +} diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index 99870bba1..c3dd6980e 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -53,7 +53,7 @@ type ConversationDatabase interface { GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.ConversationModel, error) - GetConversationNeedOfflinePushUserIDs(ctx context.Context, conversationID string, ownerUserIDs []string) ([]string, error) + GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) } func NewConversationDatabase(conversation relationtb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { @@ -89,6 +89,9 @@ func (c *conversationDatabase) SetUsersConversationFiledTx(ctx context.Context, cache = cache.DelUserAllHasReadSeqs(userID, conversation.ConversationID) } } + if _, ok := filedMap["recv_msg_opt"]; ok { + cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID) + } } NotUserIDs := utils.DifferenceString(haveUserIDs, userIDs) log.ZDebug(ctx, "SetUsersConversationFiledTx", "NotUserIDs", NotUserIDs, "haveUserIDs", haveUserIDs, "userIDs", userIDs) @@ -122,7 +125,12 @@ func (c *conversationDatabase) UpdateUsersConversationFiled(ctx context.Context, if err != nil { return err } - return c.cache.DelUsersConversation(conversationID, userIDs...).ExecDel(ctx) + cache := c.cache.NewCache() + cache = cache.DelUsersConversation(conversationID, userIDs...) + if _, ok := args["recv_msg_opt"]; ok { + cache = cache.DelConversationNotReceiveMessageUserIDs(conversationID) + } + return cache.ExecDel(ctx) } func (c *conversationDatabase) CreateConversation(ctx context.Context, conversations []*relationtb.ConversationModel) error { @@ -133,6 +141,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat cache := c.cache.NewCache() for _, conversation := range conversations { cache = cache.DelConversations(conversation.OwnerUserID, conversation.ConversationID) + cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID) userIDs = append(userIDs, conversation.OwnerUserID) } return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).ExecDel(ctx) @@ -225,7 +234,7 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs if err != nil { return err } - cache = cache.DelConversationIDs(ownerUserID).DelUserConversationIDsHash(ownerUserID) + cache = cache.DelConversationIDs(ownerUserID).DelUserConversationIDsHash(ownerUserID).DelConversationNotReceiveMessageUserIDs(utils.Slice(notExistConversations, func(e *relationtb.ConversationModel) string { return e.ConversationID })...) } return nil }); err != nil { @@ -251,7 +260,7 @@ func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, for _, v := range notExistUserIDs { conversation := relationtb.ConversationModel{ConversationType: constant.SuperGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID} conversations = append(conversations, &conversation) - cache = cache.DelConversations(v, conversationID) + cache = cache.DelConversations(v, conversationID).DelConversationNotReceiveMessageUserIDs(conversationID) } cache = cache.DelConversationIDs(notExistUserIDs...).DelUserConversationIDsHash(notExistUserIDs...) if len(conversations) > 0 { @@ -298,6 +307,6 @@ func (c *conversationDatabase) GetConversationIDsNeedDestruct(ctx context.Contex return c.conversationDB.GetConversationIDsNeedDestruct(ctx) } -func (c *conversationDatabase) GetConversationNeedOfflinePushUserIDs(ctx context.Context, conversationID string, ownerUserIDs []string) ([]string, error) { - return c.conversationDB.GetConversationNeedOfflinePushUserIDs(ctx, conversationID, ownerUserIDs) +func (c *conversationDatabase) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { + return c.cache.GetConversationNotReceiveMessageUserIDs(ctx, conversationID) } diff --git a/pkg/common/db/relation/conversation_model.go b/pkg/common/db/relation/conversation_model.go index baf20a0d3..d5ca92ec2 100644 --- a/pkg/common/db/relation/conversation_model.go +++ b/pkg/common/db/relation/conversation_model.go @@ -217,13 +217,23 @@ func (c *ConversationGorm) GetConversationIDsNeedDestruct( ) } -func (c *ConversationGorm) GetConversationNeedOfflinePushUserIDs(ctx context.Context, conversationID string, ownerUserIDs []string) ([]string, error) { +func (c *ConversationGorm) GetConversationRecvMsgOpt(ctx context.Context, userID string, conversationID string) (int32, error) { + var recvMsgOpt int32 + return recvMsgOpt, errs.Wrap( + c.db(ctx). + Model(&relation.ConversationModel{}). + Where("conversation_id = ? and owner_user_id in ?", conversationID, userID). + Pluck("recv_msg_opt", &recvMsgOpt). + Error, + ) +} + +func (c *ConversationGorm) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { var userIDs []string return userIDs, errs.Wrap( c.db(ctx). Model(&relation.ConversationModel{}). - Where("conversation_id = ? and owner_user_id in ? and recv_msg_opt = ?", conversationID, ownerUserIDs, constant.ReceiveMessage). - Pluck("owner_user_id", &userIDs). - Error, + Where("conversation_id = ? and recv_msg_opt <> ?", conversationID, constant.ReceiveMessage). + Pluck("owner_user_id", &userIDs).Error, ) } diff --git a/pkg/common/db/table/relation/conversation.go b/pkg/common/db/table/relation/conversation.go index 2721f05e5..7e6c6bdf8 100644 --- a/pkg/common/db/table/relation/conversation.go +++ b/pkg/common/db/table/relation/conversation.go @@ -66,6 +66,6 @@ type ConversationModelInterface interface { GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hashReadSeqs map[string]int64, err error) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*ConversationModel, error) GetConversationIDsNeedDestruct(ctx context.Context) ([]*ConversationModel, error) - GetConversationNeedOfflinePushUserIDs(ctx context.Context, conversationID string, ownerUserIDs []string) ([]string, error) + GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) NewTx(tx any) ConversationModelInterface }