diff --git a/internal/msgtransfer/modify_msg_handler.go b/internal/msgtransfer/modify_msg_handler.go index beeb48d3e..948f43df6 100644 --- a/internal/msgtransfer/modify_msg_handler.go +++ b/internal/msgtransfer/modify_msg_handler.go @@ -40,12 +40,12 @@ func (ModifyMsgConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { r func (mmc *ModifyMsgConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { - log.NewDebug("", "kafka get info to mysql", "ModifyMsgConsumerHandler", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) + ctx := mmc.modifyMsgConsumerGroup.GetContextFromMsg(msg) + log.ZDebug(ctx, "kafka get info to mysql", "ModifyMsgConsumerHandler", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) if len(msg.Value) != 0 { - ctx := mmc.modifyMsgConsumerGroup.GetContextFromMsg(msg) mmc.ModifyMsg(ctx, msg, string(msg.Key), sess) } else { - log.Error("", "msg get from kafka but is nil", msg.Key) + log.ZError(ctx, "msg get from kafka but is nil", nil, "key", msg.Key) } sess.MarkMessage(msg, "") } diff --git a/internal/msgtransfer/persistent_msg_handler.go b/internal/msgtransfer/persistent_msg_handler.go index c355a4caa..6d9b64a13 100644 --- a/internal/msgtransfer/persistent_msg_handler.go +++ b/internal/msgtransfer/persistent_msg_handler.go @@ -14,7 +14,6 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" kfk "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" @@ -38,16 +37,15 @@ func NewPersistentConsumerHandler(database controller.ChatLogDatabase) *Persiste func (pc *PersistentConsumerHandler) handleChatWs2Mysql(ctx context.Context, cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) { msg := cMsg.Value - operationID := mcontext.GetOperationID(ctx) - log.NewInfo("msg come here mysql!!!", "", "msg", string(msg), msgKey) var tag bool msgFromMQ := pbMsg.MsgDataToMQ{} err := proto.Unmarshal(msg, &msgFromMQ) if err != nil { - log.NewError(operationID, "msg_transfer Unmarshal msg err", "msg", string(msg), "err", err.Error()) + log.ZError(ctx, "msg_transfer Unmarshal msg err", err) return } - log.Debug(operationID, "proto.Unmarshal MsgDataToMQ", msgFromMQ.String()) + return + log.ZDebug(ctx, "handleChatWs2Mysql", "msg", msgFromMQ.MsgData) //Control whether to store history messages (mysql) isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent) //Only process receiver data @@ -65,9 +63,9 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(ctx context.Context, cMs tag = true } if tag { - log.NewInfo(operationID, "msg_transfer msg persisting", string(msg)) + log.ZInfo(ctx, "msg_transfer msg persisting", "msg", string(msg)) if err = pc.chatLogDatabase.CreateChatLog(&msgFromMQ); err != nil { - log.NewError(operationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String()) + log.ZError(ctx, "Message insert failed", err, "msg", msgFromMQ.String()) return } } @@ -77,12 +75,12 @@ func (PersistentConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { func (PersistentConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { - log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) + ctx := pc.persistentConsumerGroup.GetContextFromMsg(msg) + log.ZDebug(ctx, "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) if len(msg.Value) != 0 { - ctx := pc.persistentConsumerGroup.GetContextFromMsg(msg) pc.handleChatWs2Mysql(ctx, msg, string(msg.Key), sess) } else { - log.Error("", "msg get from kafka but is nil", msg.Key) + log.ZError(ctx, "msg get from kafka but is nil", nil, "key", msg.Key) } sess.MarkMessage(msg, "") } diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 54844678b..8c218f5a0 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -2,6 +2,7 @@ package push import ( "context" + "encoding/json" "errors" "github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush" @@ -101,6 +102,16 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg return nil } +func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t interface{}) error { + var notificationElem struct { + Detail string `json:"detail,omitempty"` + } + if err := json.Unmarshal(bytes, ¬ificationElem); err != nil { + return err + } + return json.Unmarshal([]byte(notificationElem.Detail), t) +} + func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { operationID := mcontext.GetOperationID(ctx) log.Debug(operationID, "Get super group msg from msg_transfer And push msg", msg.String(), groupID) @@ -113,6 +124,21 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws if err != nil { return err } + switch msg.ContentType { + case constant.MemberQuitNotification: + var tips sdkws.MemberQuitTips + if p.UnmarshalNotificationElem(msg.Content, &tips) != nil { + return err + } + pushToUserIDs = append(pushToUserIDs, tips.QuitUser.UserID) + case constant.MemberKickedNotification: + var tips sdkws.MemberKickedTips + if p.UnmarshalNotificationElem(msg.Content, &tips) != nil { + return err + } + kickedUsers := utils.Slice(tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID }) + pushToUserIDs = append(pushToUserIDs, kickedUsers...) + } } wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs) if err != nil { diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 4649409b7..99a18baac 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -50,7 +50,6 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e } func (c *conversationServer) GetConversation(ctx context.Context, req *pbConversation.GetConversationReq) (*pbConversation.GetConversationResp, error) { - resp := &pbConversation.GetConversationResp{Conversation: &pbConversation.Conversation{}} conversations, err := c.conversationDatabase.FindConversations(ctx, req.OwnerUserID, []string{req.ConversationID}) if err != nil { return nil, err @@ -58,16 +57,17 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbConvers if len(conversations) < 1 { return nil, errs.ErrRecordNotFound.Wrap("conversation not found") } + resp := &pbConversation.GetConversationResp{Conversation: &pbConversation.Conversation{}} resp.Conversation = convert.ConversationDB2Pb(conversations[0]) return resp, nil } func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbConversation.GetAllConversationsReq) (*pbConversation.GetAllConversationsResp, error) { - resp := &pbConversation.GetAllConversationsResp{Conversations: []*pbConversation.Conversation{}} conversations, err := c.conversationDatabase.GetUserAllConversation(ctx, req.OwnerUserID) if err != nil { return nil, err } + resp := &pbConversation.GetAllConversationsResp{Conversations: []*pbConversation.Conversation{}} resp.Conversations = convert.ConversationsDB2Pb(conversations) return resp, nil } @@ -297,7 +297,7 @@ func (c *conversationServer) GetConversationIDs(ctx context.Context, req *pbConv } func (c *conversationServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *pbConversation.GetConversationsHasReadAndMaxSeqReq) (*pbConversation.GetConversationsHasReadAndMaxSeqResp, error) { - conversations, err := c.conversationDatabase.GetUserAllConversation(ctx, req.UserID) + conversations, err := c.conversationDatabase.GetUserAllHasReadSeqs(ctx, req.UserID) if err != nil { return nil, err } @@ -306,10 +306,10 @@ func (c *conversationServer) GetConversationsHasReadAndMaxSeq(ctx context.Contex return nil, err } resp := &pbConversation.GetConversationsHasReadAndMaxSeqResp{Seqs: make(map[string]*pbConversation.Seqs)} - for _, v := range conversations { - resp.Seqs[v.ConversationID] = &pbConversation.Seqs{ - HasReadSeq: v.HasReadSeq, - MaxSeq: maxSeqs.MaxSeqs[v.ConversationID], + for conversationID, seq := range conversations { + resp.Seqs[conversationID] = &pbConversation.Seqs{ + HasReadSeq: seq, + MaxSeq: maxSeqs.MaxSeqs[conversationID], } } return resp, nil diff --git a/internal/rpc/msg/send_pull.go b/internal/rpc/msg/send_pull.go index b2215ec6c..e156650fe 100644 --- a/internal/rpc/msg/send_pull.go +++ b/internal/rpc/msg/send_pull.go @@ -21,7 +21,7 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgR promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter) return nil, err } - err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.GroupID, req.MsgData) + err = m.MsgDatabase.MsgToMQ(ctx, utils.GetConversationIDByMsg(req.MsgData), req.MsgData) if err != nil { return nil, err } diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index 79d9f4a38..f50f6decb 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -18,6 +18,7 @@ const ( conversationKey = "CONVERSATION:" conversationIDsKey = "CONVERSATION_IDS:" conversationIDsHashKey = "CONVERSATION_IDS_HASH:" + conversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:" recvMsgOptKey = "RECV_MSG_OPT:" superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:" @@ -53,6 +54,9 @@ type ConversationCache interface { // get one super group recv msg but do not notification userID list hash GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache + + GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) + DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache } func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options, db relationTb.ConversationModelInterface) ConversationCache { @@ -96,6 +100,22 @@ func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsHashKey(groupI return superGroupRecvMsgNotNotifyUserIDsHashKey + groupID } +func (c *ConversationRedisCache) getConversationHasReadSeqKey(ownerUserID, conversationID string) string { + return conversationHasReadSeqKey + ownerUserID + ":" + conversationID +} + +func (c *ConversationRedisCache) getAllConversationIDsKeys(ctx context.Context, ownerUserID string) ([]string, []string, error) { + conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID) + if err != nil { + return nil, nil, err + } + var keys []string + for _, conversarionID := range conversationIDs { + keys = append(keys, c.getConversationKey(ownerUserID, conversarionID)) + } + return keys, conversationIDs, nil +} + func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { return getCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) { return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID) @@ -155,7 +175,7 @@ func (c *ConversationRedisCache) DelConvsersations(ownerUserID string, convsersa return cache } -func (c *ConversationRedisCache) GetConversationIndex(convsation *relationTb.ConversationModel, keys []string) (int, error) { +func (c *ConversationRedisCache) getConversationIndex(convsation *relationTb.ConversationModel, keys []string) (int, error) { key := c.getConversationKey(convsation.OwnerUserID, convsation.ConversationID) for _i, _key := range keys { if _key == key { @@ -170,21 +190,17 @@ func (c *ConversationRedisCache) GetConversations(ctx context.Context, ownerUser for _, conversarionID := range conversationIDs { keys = append(keys, c.getConversationKey(ownerUserID, conversarionID)) } - return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.GetConversationIndex, func(ctx context.Context) ([]*relationTb.ConversationModel, error) { + return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getConversationIndex, func(ctx context.Context) ([]*relationTb.ConversationModel, error) { return c.conversationDB.Find(ctx, ownerUserID, conversationIDs) }) } func (c *ConversationRedisCache) GetUserAllConversations(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) { - conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID) + keys, _, err := c.getAllConversationIDsKeys(ctx, ownerUserID) if err != nil { return nil, err } - var keys []string - for _, conversarionID := range conversationIDs { - keys = append(keys, c.getConversationKey(ownerUserID, conversarionID)) - } - return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.GetConversationIndex, func(ctx context.Context) ([]*relationTb.ConversationModel, error) { + return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getConversationIndex, func(ctx context.Context) ([]*relationTb.ConversationModel, error) { return c.conversationDB.FindUserIDAllConversations(ctx, ownerUserID) }) } @@ -241,3 +257,30 @@ func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupI cache.AddKeys(c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID)) return cache } + +func (c *ConversationRedisCache) getUserAllHasReadSeqsIndex(conversationID string, conversationIDs []string) (int, error) { + for _i, _conversationID := range conversationIDs { + if _conversationID == conversationID { + return _i, nil + } + } + return 0, errors.New("not found key:" + conversationID + " in keys") +} + +func (c *ConversationRedisCache) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) { + keys, conversationIDs, err := c.getAllConversationIDsKeys(ctx, ownerUserID) + if err != nil { + return nil, err + } + return batchGetCacheMap(ctx, c.rcClient, keys, conversationIDs, c.expireTime, c.getUserAllHasReadSeqsIndex, func(ctx context.Context) (map[string]int64, error) { + return c.conversationDB.GetUserAllHasReadSeqs(ctx, ownerUserID) + }) +} + +func (c *ConversationRedisCache) DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache { + cache := c.NewCache() + for _, conversationID := range conversationIDs { + cache.AddKeys(c.getConversationHasReadSeqKey(ownerUserID, conversationID)) + } + return cache +} diff --git a/pkg/common/db/cache/meta_cache.go b/pkg/common/db/cache/meta_cache.go index 5866b5b82..b001fa227 100644 --- a/pkg/common/db/cache/meta_cache.go +++ b/pkg/common/db/cache/meta_cache.go @@ -153,15 +153,15 @@ func batchGetCache[T any](ctx context.Context, rcClient *rockscache.Client, keys return tArrays, nil } -func batchGetCacheMap[T any](ctx context.Context, rcClient *rockscache.Client, keys []string, originKeys []string, expire time.Duration, keyIndexFn func(t T, keys []string) (int, error), fn func(ctx context.Context) (map[string]T, error)) (map[string]T, error) { +func batchGetCacheMap[T any](ctx context.Context, rcClient *rockscache.Client, keys, originKeys []string, expire time.Duration, keyIndexFn func(s string, keys []string) (int, error), fn func(ctx context.Context) (map[string]T, error)) (map[string]T, error) { batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) { - values := make(map[int]string) tArrays, err := fn(ctx) if err != nil { return nil, err } - for _, v := range tArrays { - index, err := keyIndexFn(v, keys) + values := make(map[int]string) + for k, v := range tArrays { + index, err := keyIndexFn(k, originKeys) if err != nil { continue } @@ -184,7 +184,7 @@ func batchGetCacheMap[T any](ctx context.Context, rcClient *rockscache.Client, k if err != nil { return nil, utils.Wrap(err, "unmarshal failed") } - tMap[keys[i]] = t + tMap[originKeys[i]] = t } } return tMap, nil diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index a53e5a5e8..6d56dba4a 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -251,14 +251,14 @@ func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, failedSeqs = append(failedSeqs, seqs[i]) } else { msg := sdkws.MsgData{} - err = jsonpb.UnmarshalString(cmd.Val(), &msg) + err = utils.String2Pb(cmd.Val(), &msg) if err == nil { if msg.Status != constant.MsgDeleted { seqMsgs = append(seqMsgs, &msg) continue } } else { - log.ZWarn(ctx, "UnmarshalString failed", err, "conversationID", conversationID, "seq", seqs[i]) + log.ZWarn(ctx, "UnmarshalString failed", err, "conversationID", conversationID, "seq", seqs[i], "msg", cmd.Val()) } failedSeqs = append(failedSeqs, seqs[i]) } diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index cc27f8f73..95dfd9a4c 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -32,6 +32,7 @@ type ConversationDatabase interface { GetConversationIDs(ctx context.Context, userID string) ([]string, error) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) GetAllConversationIDs(ctx context.Context) ([]string, error) + GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) } func NewConversationDatabase(conversation relationTb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { @@ -252,3 +253,7 @@ func (c *ConversationDataBase) GetUserConversationIDsHash(ctx context.Context, o func (c *ConversationDataBase) GetAllConversationIDs(ctx context.Context) ([]string, error) { return c.conversationDB.GetAllConversationIDs(ctx) } + +func (c *ConversationDataBase) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) { + return c.cache.GetUserAllHasReadSeqs(ctx, ownerUserID) +} diff --git a/pkg/common/db/relation/conversation_model.go b/pkg/common/db/relation/conversation_model.go index 09b7599da..f6cd94317 100644 --- a/pkg/common/db/relation/conversation_model.go +++ b/pkg/common/db/relation/conversation_model.go @@ -80,3 +80,13 @@ func (c *ConversationGorm) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, c func (c *ConversationGorm) GetAllConversationIDs(ctx context.Context) (conversationIDs []string, err error) { return conversationIDs, utils.Wrap(c.db(ctx).Distinct("conversation_id").Pluck("conversation_id", &conversationIDs).Error, "") } + +func (c *ConversationGorm) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hasReadSeqs map[string]int64, err error) { + var conversations []*relation.ConversationModel + err = utils.Wrap(c.db(ctx).Where("owner_user_id = ?", ownerUserID).Select("conversation_id", "has_read_seq").Find(&conversations).Error, "") + hasReadSeqs = make(map[string]int64, len(conversations)) + for _, conversation := range conversations { + hasReadSeqs[conversation.ConversationID] = conversation.HasReadSeq + } + return hasReadSeqs, err +} diff --git a/pkg/common/db/table/relation/conversation.go b/pkg/common/db/table/relation/conversation.go index 0f44d096c..d5fc2bea0 100644 --- a/pkg/common/db/table/relation/conversation.go +++ b/pkg/common/db/table/relation/conversation.go @@ -45,5 +45,6 @@ type ConversationModelInterface interface { GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) FindSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) GetAllConversationIDs(ctx context.Context) ([]string, error) + GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hashReadSeqs map[string]int64, err error) NewTx(tx any) ConversationModelInterface } diff --git a/pkg/rpcclient/notification/group.go b/pkg/rpcclient/notification/group.go index 5f4453a83..054ed744f 100644 --- a/pkg/rpcclient/notification/group.go +++ b/pkg/rpcclient/notification/group.go @@ -374,29 +374,28 @@ func (g *GroupNotificationSender) MemberQuitNotification(ctx context.Context, re if err != nil { return err } - // todo 退群后查不到 opUserID := mcontext.GetOpUserID(ctx) user, err := g.getUser(ctx, opUserID) if err != nil { return err } - userIDs, err := g.getGroupOwnerAndAdminUserID(ctx, req.GroupID) - if err != nil { - return err - } + //userIDs, err := g.getGroupOwnerAndAdminUserID(ctx, req.GroupID) + //if err != nil { + // return err + //} tips := &sdkws.MemberQuitTips{Group: group, QuitUser: &sdkws.GroupMemberFullInfo{ GroupID: group.GroupID, UserID: user.UserID, Nickname: user.Nickname, FaceURL: user.FaceURL, }} - for _, userID := range append(userIDs, opUserID) { - err = g.msgClient.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.MemberQuitNotification, tips) - if err != nil { - return err - } - } - return nil + //for _, userID := range append(userIDs, opUserID) { + // err = g.msgClient.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.MemberQuitNotification, tips) + // if err != nil { + // log.ZError(ctx, "MemberQuitNotification failed", err, "group", req.GroupID, "userID", userID) + // } + //} + return g.msgClient.Notification(ctx, mcontext.GetOpUserID(ctx), req.GroupID, constant.MemberQuitNotification, tips) } func (g *GroupNotificationSender) GroupApplicationAcceptedNotification(ctx context.Context, req *pbGroup.GroupApplicationResponseReq) (err error) { @@ -614,11 +613,16 @@ func (g *GroupNotificationSender) GroupMutedNotification(ctx context.Context, gr if err != nil { return err } - user, err := g.getGroupMember(ctx, groupID, mcontext.GetOpUserID(ctx)) + users, err := g.getGroupMembers(ctx, groupID, []string{mcontext.GetOpUserID(ctx)}) if err != nil { return err } - tips := &sdkws.GroupMutedTips{Group: group, OpUser: user} + tips := &sdkws.GroupMutedTips{Group: group} + if len(users) > 0 { + tips.OpUser = users[0] + } else { + tips.OpUser = &sdkws.GroupMemberFullInfo{UserID: mcontext.GetOpUserID(ctx), GroupID: groupID} + } return g.msgClient.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMutedNotification, tips) } @@ -633,12 +637,17 @@ func (g *GroupNotificationSender) GroupCancelMutedNotification(ctx context.Conte if err != nil { return err } - user, err := g.getGroupMember(ctx, groupID, mcontext.GetOpUserID(ctx)) + users, err := g.getGroupMembers(ctx, groupID, []string{mcontext.GetOpUserID(ctx)}) if err != nil { return err } - tips := &sdkws.GroupCancelMutedTips{Group: group, OpUser: user} - return g.msgClient.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMutedNotification, tips) + tips := &sdkws.GroupCancelMutedTips{Group: group} + if len(users) > 0 { + tips.OpUser = users[0] + } else { + tips.OpUser = &sdkws.GroupMemberFullInfo{UserID: mcontext.GetOpUserID(ctx), GroupID: groupID} + } + return g.msgClient.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupCancelMutedNotification, tips) } func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Context, groupID, groupMemberUserID string) (err error) {