conversation

pull/1054/head
withchao 2 years ago
parent bd13ab666f
commit 39266f3db6

@ -37,7 +37,7 @@ require github.com/google/uuid v1.3.1
require ( require (
github.com/IBM/sarama v1.41.1 github.com/IBM/sarama v1.41.1
github.com/OpenIMSDK/protocol v0.0.19 github.com/OpenIMSDK/protocol v0.0.20
github.com/OpenIMSDK/tools v0.0.14 github.com/OpenIMSDK/tools v0.0.14
github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible
github.com/go-redis/redis v6.15.9+incompatible github.com/go-redis/redis v6.15.9+incompatible

@ -19,8 +19,8 @@ firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIw
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/IBM/sarama v1.41.1 h1:B4/TdHce/8Ipza+qrLIeNJ9D1AOxZVp/3uDv6H/dp2M= github.com/IBM/sarama v1.41.1 h1:B4/TdHce/8Ipza+qrLIeNJ9D1AOxZVp/3uDv6H/dp2M=
github.com/IBM/sarama v1.41.1/go.mod h1:JFCPURVskaipJdKRFkiE/OZqQHw7jqliaJmRwXCmSSw= github.com/IBM/sarama v1.41.1/go.mod h1:JFCPURVskaipJdKRFkiE/OZqQHw7jqliaJmRwXCmSSw=
github.com/OpenIMSDK/protocol v0.0.19 h1:uhIToUMv6AZK0g5k4uuSEMRwuRdeLSUmM4yMG0Hw5Rk= github.com/OpenIMSDK/protocol v0.0.20 h1:x6tAbBTBiaFdvT98nM6IRnEIHsVr0Weh/fAaTLBrN64=
github.com/OpenIMSDK/protocol v0.0.19/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/OpenIMSDK/protocol v0.0.20/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/tools v0.0.14 h1:WLof/+WxyPyRST+QkoTKubYCiV73uCLiL8pgnpH/yKQ= github.com/OpenIMSDK/tools v0.0.14 h1:WLof/+WxyPyRST+QkoTKubYCiV73uCLiL8pgnpH/yKQ=
github.com/OpenIMSDK/tools v0.0.14/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/OpenIMSDK/tools v0.0.14/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=

@ -235,20 +235,25 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
if len(offlinePushUserIDs) > 0 { if len(offlinePushUserIDs) > 0 {
needOfflinePushUserIDs = offlinePushUserIDs needOfflinePushUserIDs = offlinePushUserIDs
} }
resp, err := p.conversationRpcClient.Client.GetConversationNeedOfflinePushUserIDs( resp, err := p.conversationRpcClient.Client.GetConversationNotOfflinePushUserIDs(ctx, &conversation.GetConversationNotOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs})
ctx,
&conversation.GetConversationNeedOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), OwnerUserIDs: needOfflinePushUserIDs},
)
if err != nil { if err != nil {
return err return err
} }
if len(resp.UserIDs) > 0 { userIDSet := make(map[string]struct{})
err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs) for _, userID := range needOfflinePushUserIDs {
userIDSet[userID] = struct{}{}
}
for _, userID := range resp.UserIDs {
delete(userIDSet, userID)
}
if len(userIDSet) > 0 {
userIDs := utils.Keys(userIDSet)
err = p.offlinePushMsg(ctx, groupID, msg, userIDs)
if err != nil { if err != nil {
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
return err return err
} }
if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, WebAndPcBackgroundUserIDs)); err != nil { if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(userIDs, WebAndPcBackgroundUserIDs)); err != nil {
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs)) log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs))
return err return err
} }

@ -301,19 +301,32 @@ func (c *conversationServer) GetConversationsByConversationID(
return &pbconversation.GetConversationsByConversationIDResp{Conversations: convert.ConversationsDB2Pb(conversations)}, nil return &pbconversation.GetConversationsByConversationIDResp{Conversations: convert.ConversationsDB2Pb(conversations)}, nil
} }
func (c *conversationServer) GetConversationNeedOfflinePushUserIDs( func (c *conversationServer) GetConversationNotOfflinePushUserIDs(
ctx context.Context, ctx context.Context,
req *pbconversation.GetConversationNeedOfflinePushUserIDsReq, req *pbconversation.GetConversationNotOfflinePushUserIDsReq,
) (*pbconversation.GetConversationNeedOfflinePushUserIDsResp, error) { ) (*pbconversation.GetConversationNotOfflinePushUserIDsResp, error) {
if req.ConversationID == "" { if req.ConversationID == "" {
return nil, errs.ErrArgs.Wrap("conversationID is empty") return nil, errs.ErrArgs.Wrap("conversationID is empty")
} }
if len(req.OwnerUserIDs) == 0 { userIDs, err := c.conversationDatabase.GetConversationNotReceiveMessageUserIDs(ctx, req.ConversationID)
return &pbconversation.GetConversationNeedOfflinePushUserIDsResp{}, nil
}
userIDs, err := c.conversationDatabase.GetConversationNeedOfflinePushUserIDs(ctx, req.ConversationID, req.OwnerUserIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &pbconversation.GetConversationNeedOfflinePushUserIDsResp{UserIDs: userIDs}, nil if len(req.UserIDs) == 0 {
return &pbconversation.GetConversationNotOfflinePushUserIDsResp{UserIDs: userIDs}, nil
}
if len(userIDs) == 0 {
return &pbconversation.GetConversationNotOfflinePushUserIDsResp{}, nil
}
userIDSet := make(map[string]struct{})
for _, userID := range userIDs {
userIDSet[userID] = struct{}{}
}
resp := &pbconversation.GetConversationNotOfflinePushUserIDsResp{UserIDs: make([]string, 0, len(req.UserIDs))}
for _, userID := range req.UserIDs {
if _, ok := userIDSet[userID]; !ok {
resp.UserIDs = append(resp.UserIDs, userID)
}
}
return resp, nil
} }

@ -38,6 +38,7 @@ const (
recvMsgOptKey = "RECV_MSG_OPT:" recvMsgOptKey = "RECV_MSG_OPT:"
superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:" superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:"
conversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:"
conversationExpireTime = time.Second * 60 * 60 * 12 conversationExpireTime = time.Second * 60 * 60 * 12
) )
@ -83,6 +84,8 @@ type ConversationCache interface {
conversationIDs []string, conversationIDs []string,
) ([]*relationtb.ConversationModel, error) ) ([]*relationtb.ConversationModel, error)
DelConversationByConversationID(conversationIDs ...string) ConversationCache DelConversationByConversationID(conversationIDs ...string) ConversationCache
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
} }
func NewConversationRedis( func NewConversationRedis(
@ -153,6 +156,10 @@ func (c *ConversationRedisCache) getConversationHasReadSeqKey(ownerUserID, conve
return conversationHasReadSeqKey + ownerUserID + ":" + conversationID return conversationHasReadSeqKey + ownerUserID + ":" + conversationID
} }
func (c *ConversationRedisCache) getConversationNotReceiveMessageUserIDsKey(conversationID string) string {
return conversationNotReceiveMessageUserIDsKey + conversationID
}
func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) {
return getCache( return getCache(
ctx, ctx,
@ -432,3 +439,23 @@ func (c *ConversationRedisCache) GetConversationsByConversationID(
func (c *ConversationRedisCache) DelConversationByConversationID(conversationIDs ...string) ConversationCache { func (c *ConversationRedisCache) DelConversationByConversationID(conversationIDs ...string) ConversationCache {
panic("implement me") panic("implement me")
} }
func (c *ConversationRedisCache) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) {
return getCache(
ctx,
c.rcClient,
c.getConversationNotReceiveMessageUserIDsKey(conversationID),
c.expireTime,
func(ctx context.Context) ([]string, error) {
return c.conversationDB.GetConversationNotReceiveMessageUserIDs(ctx, conversationID)
},
)
}
func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache {
cache := c.NewCache()
for _, conversationID := range conversationIDs {
cache.AddKeys(c.getConversationNotReceiveMessageUserIDsKey(conversationID))
}
return cache
}

@ -53,7 +53,7 @@ type ConversationDatabase interface {
GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error)
GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.ConversationModel, error) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.ConversationModel, error)
GetConversationNeedOfflinePushUserIDs(ctx context.Context, conversationID string, ownerUserIDs []string) ([]string, error) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
} }
func NewConversationDatabase(conversation relationtb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { func NewConversationDatabase(conversation relationtb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
@ -89,6 +89,9 @@ func (c *conversationDatabase) SetUsersConversationFiledTx(ctx context.Context,
cache = cache.DelUserAllHasReadSeqs(userID, conversation.ConversationID) cache = cache.DelUserAllHasReadSeqs(userID, conversation.ConversationID)
} }
} }
if _, ok := filedMap["recv_msg_opt"]; ok {
cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID)
}
} }
NotUserIDs := utils.DifferenceString(haveUserIDs, userIDs) NotUserIDs := utils.DifferenceString(haveUserIDs, userIDs)
log.ZDebug(ctx, "SetUsersConversationFiledTx", "NotUserIDs", NotUserIDs, "haveUserIDs", haveUserIDs, "userIDs", userIDs) log.ZDebug(ctx, "SetUsersConversationFiledTx", "NotUserIDs", NotUserIDs, "haveUserIDs", haveUserIDs, "userIDs", userIDs)
@ -122,7 +125,12 @@ func (c *conversationDatabase) UpdateUsersConversationFiled(ctx context.Context,
if err != nil { if err != nil {
return err return err
} }
return c.cache.DelUsersConversation(conversationID, userIDs...).ExecDel(ctx) cache := c.cache.NewCache()
cache = cache.DelUsersConversation(conversationID, userIDs...)
if _, ok := args["recv_msg_opt"]; ok {
cache = cache.DelConversationNotReceiveMessageUserIDs(conversationID)
}
return cache.ExecDel(ctx)
} }
func (c *conversationDatabase) CreateConversation(ctx context.Context, conversations []*relationtb.ConversationModel) error { func (c *conversationDatabase) CreateConversation(ctx context.Context, conversations []*relationtb.ConversationModel) error {
@ -133,6 +141,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
cache := c.cache.NewCache() cache := c.cache.NewCache()
for _, conversation := range conversations { for _, conversation := range conversations {
cache = cache.DelConversations(conversation.OwnerUserID, conversation.ConversationID) cache = cache.DelConversations(conversation.OwnerUserID, conversation.ConversationID)
cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID)
userIDs = append(userIDs, conversation.OwnerUserID) userIDs = append(userIDs, conversation.OwnerUserID)
} }
return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).ExecDel(ctx) return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).ExecDel(ctx)
@ -225,7 +234,7 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs
if err != nil { if err != nil {
return err return err
} }
cache = cache.DelConversationIDs(ownerUserID).DelUserConversationIDsHash(ownerUserID) cache = cache.DelConversationIDs(ownerUserID).DelUserConversationIDsHash(ownerUserID).DelConversationNotReceiveMessageUserIDs(utils.Slice(notExistConversations, func(e *relationtb.ConversationModel) string { return e.ConversationID })...)
} }
return nil return nil
}); err != nil { }); err != nil {
@ -251,7 +260,7 @@ func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context,
for _, v := range notExistUserIDs { for _, v := range notExistUserIDs {
conversation := relationtb.ConversationModel{ConversationType: constant.SuperGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID} conversation := relationtb.ConversationModel{ConversationType: constant.SuperGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID}
conversations = append(conversations, &conversation) conversations = append(conversations, &conversation)
cache = cache.DelConversations(v, conversationID) cache = cache.DelConversations(v, conversationID).DelConversationNotReceiveMessageUserIDs(conversationID)
} }
cache = cache.DelConversationIDs(notExistUserIDs...).DelUserConversationIDsHash(notExistUserIDs...) cache = cache.DelConversationIDs(notExistUserIDs...).DelUserConversationIDsHash(notExistUserIDs...)
if len(conversations) > 0 { if len(conversations) > 0 {
@ -298,6 +307,6 @@ func (c *conversationDatabase) GetConversationIDsNeedDestruct(ctx context.Contex
return c.conversationDB.GetConversationIDsNeedDestruct(ctx) return c.conversationDB.GetConversationIDsNeedDestruct(ctx)
} }
func (c *conversationDatabase) GetConversationNeedOfflinePushUserIDs(ctx context.Context, conversationID string, ownerUserIDs []string) ([]string, error) { func (c *conversationDatabase) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) {
return c.conversationDB.GetConversationNeedOfflinePushUserIDs(ctx, conversationID, ownerUserIDs) return c.cache.GetConversationNotReceiveMessageUserIDs(ctx, conversationID)
} }

@ -217,13 +217,23 @@ func (c *ConversationGorm) GetConversationIDsNeedDestruct(
) )
} }
func (c *ConversationGorm) GetConversationNeedOfflinePushUserIDs(ctx context.Context, conversationID string, ownerUserIDs []string) ([]string, error) { func (c *ConversationGorm) GetConversationRecvMsgOpt(ctx context.Context, userID string, conversationID string) (int32, error) {
var recvMsgOpt int32
return recvMsgOpt, errs.Wrap(
c.db(ctx).
Model(&relation.ConversationModel{}).
Where("conversation_id = ? and owner_user_id in ?", conversationID, userID).
Pluck("recv_msg_opt", &recvMsgOpt).
Error,
)
}
func (c *ConversationGorm) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) {
var userIDs []string var userIDs []string
return userIDs, errs.Wrap( return userIDs, errs.Wrap(
c.db(ctx). c.db(ctx).
Model(&relation.ConversationModel{}). Model(&relation.ConversationModel{}).
Where("conversation_id = ? and owner_user_id in ? and recv_msg_opt = ?", conversationID, ownerUserIDs, constant.ReceiveMessage). Where("conversation_id = ? and recv_msg_opt <> ?", conversationID, constant.ReceiveMessage).
Pluck("owner_user_id", &userIDs). Pluck("owner_user_id", &userIDs).Error,
Error,
) )
} }

@ -66,6 +66,6 @@ type ConversationModelInterface interface {
GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hashReadSeqs map[string]int64, err error) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hashReadSeqs map[string]int64, err error)
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*ConversationModel, error) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*ConversationModel, error)
GetConversationIDsNeedDestruct(ctx context.Context) ([]*ConversationModel, error) GetConversationIDsNeedDestruct(ctx context.Context) ([]*ConversationModel, error)
GetConversationNeedOfflinePushUserIDs(ctx context.Context, conversationID string, ownerUserIDs []string) ([]string, error) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
NewTx(tx any) ConversationModelInterface NewTx(tx any) ConversationModelInterface
} }

Loading…
Cancel
Save