From ee5e9a0f9025853c346683b46f4743a93e43db73 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Tue, 27 Jun 2023 20:17:35 +0800 Subject: [PATCH 1/6] feat: conversation set @ type --- internal/rpc/msg/send.go | 48 ++++++++++++++++++++++++++++++++++- pkg/rpcclient/conversation.go | 4 +++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 89f015315..6046c086c 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -2,12 +2,15 @@ package msg import ( "context" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" promePkg "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" + pbConversation "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation" pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/wrapperspb" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" ) @@ -46,6 +49,9 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbMsg.SendMs if err != nil { return nil, err } + if req.MsgData.ContentType == constant.AtText { + go m.setConversationAtInfo(ctx, req.MsgData) + } if err = callbackAfterSendGroupMsg(ctx, req); err != nil { log.ZWarn(ctx, "CallbackAfterSendGroupMsg", err) } @@ -56,6 +62,46 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbMsg.SendMs resp.ClientMsgID = req.MsgData.ClientMsgID return resp, nil } +func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgData) { + ctx := mcontext.NewCtx("@@@" + mcontext.GetOperationID(nctx)) + var atUserID []string + conversation := &pbConversation.ConversationReq{ + ConversationID: utils.GetConversationIDByMsg(msg), + ConversationType: msg.SessionType, + GroupID: msg.GroupID, + } + tagAll := utils.IsContain(constant.AtAllString, msg.AtUserIDList) + if tagAll { + memberUserIDList, err := m.Group.GetGroupMemberIDs(ctx, msg.GroupID) + if err != nil { + log.ZWarn(ctx, "GetGroupMemberIDs", err) + return + } + atUserID = utils.DifferenceString([]string{constant.AtAllString}, msg.AtUserIDList) + if len(atUserID) == 0 { //just @everyone + conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll} + } else { //@Everyone and @other people + conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAllAtMe} + err := m.Conversation.SetConversations(ctx, atUserID, conversation) + if err != nil { + log.ZWarn(ctx, "SetConversations", err, atUserID, conversation) + } + memberUserIDList = utils.DifferenceString(atUserID, memberUserIDList) + } + conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll} + err = m.Conversation.SetConversations(ctx, memberUserIDList, conversation) + if err != nil { + log.ZWarn(ctx, "SetConversations", err, memberUserIDList, conversation) + } + } else { + conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtMe} + err := m.Conversation.SetConversations(ctx, msg.AtUserIDList, conversation) + if err != nil { + log.ZWarn(ctx, "SetConversations", err, msg.AtUserIDList, conversation) + } + } + +} func (m *msgServer) sendMsgNotification(ctx context.Context, req *pbMsg.SendMsgReq) (resp *pbMsg.SendMsgResp, err error) { promePkg.Inc(promePkg.SingleChatMsgRecvSuccessCounter) diff --git a/pkg/rpcclient/conversation.go b/pkg/rpcclient/conversation.go index a41565558..e9a7b1ea4 100644 --- a/pkg/rpcclient/conversation.go +++ b/pkg/rpcclient/conversation.go @@ -63,6 +63,10 @@ func (c *ConversationRpcClient) SetConversationMaxSeq(ctx context.Context, owner _, err := c.Client.SetConversationMaxSeq(ctx, &pbConversation.SetConversationMaxSeqReq{OwnerUserID: ownerUserIDs, ConversationID: conversationID, MaxSeq: maxSeq}) return err } +func (c *ConversationRpcClient) SetConversations(ctx context.Context, userIDs []string, conversation *pbConversation.ConversationReq) error { + _, err := c.Client.SetConversations(ctx, &pbConversation.SetConversationsReq{UserIDs: userIDs, Conversation: conversation}) + return err +} func (c *ConversationRpcClient) GetConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { resp, err := c.Client.GetConversationIDs(ctx, &pbConversation.GetConversationIDsReq{UserID: ownerUserID}) From 3470fd57a39abdb06b512337a4a34c757bbc1c86 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Tue, 27 Jun 2023 20:45:51 +0800 Subject: [PATCH 2/6] test: log --- internal/rpc/msg/send.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 6046c086c..bb51c2425 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -63,6 +63,7 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbMsg.SendMs return resp, nil } func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgData) { + log.ZDebug(nctx, "setConversationAtInfo", msg) ctx := mcontext.NewCtx("@@@" + mcontext.GetOperationID(nctx)) var atUserID []string conversation := &pbConversation.ConversationReq{ From 93ebd534dafaeb15a384f65a0514a22a6e7ec732 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Tue, 27 Jun 2023 20:54:10 +0800 Subject: [PATCH 3/6] test: log --- internal/rpc/msg/send.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index bb51c2425..a7fc6533b 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -63,7 +63,7 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbMsg.SendMs return resp, nil } func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgData) { - log.ZDebug(nctx, "setConversationAtInfo", msg) + log.ZDebug(nctx, "setConversationAtInfo", "msg", msg) ctx := mcontext.NewCtx("@@@" + mcontext.GetOperationID(nctx)) var atUserID []string conversation := &pbConversation.ConversationReq{ @@ -85,14 +85,14 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAllAtMe} err := m.Conversation.SetConversations(ctx, atUserID, conversation) if err != nil { - log.ZWarn(ctx, "SetConversations", err, atUserID, conversation) + log.ZWarn(ctx, "SetConversations", err, "userID", atUserID, "conversation", conversation) } memberUserIDList = utils.DifferenceString(atUserID, memberUserIDList) } conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll} err = m.Conversation.SetConversations(ctx, memberUserIDList, conversation) if err != nil { - log.ZWarn(ctx, "SetConversations", err, memberUserIDList, conversation) + log.ZWarn(ctx, "SetConversations", err, "userID", memberUserIDList, "conversation", conversation) } } else { conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtMe} From 21d1e672cd31ff66ba65796ff23e526bf73a104b Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Tue, 27 Jun 2023 21:19:57 +0800 Subject: [PATCH 4/6] fix: conversation update --- internal/rpc/conversation/conversaion.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 9af5baad8..a664fadeb 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -210,6 +210,9 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbConver if req.Conversation.IsPinned != nil { m["is_pinned"] = req.Conversation.IsPinned.Value } + if req.Conversation.GroupAtType != nil { + m["group_at_type"] = req.Conversation.GroupAtType.Value + } if req.Conversation.IsPrivateChat != nil { var conversations []*tableRelation.ConversationModel for _, ownerUserID := range req.UserIDs { From bc5f244732ec797d74d7a4e7c37d0e3c779b0653 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 28 Jun 2023 12:04:50 +0800 Subject: [PATCH 5/6] test: log --- internal/msggateway/client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 6bce68c85..b38761b88 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -212,6 +212,7 @@ func (c *Client) PushMessage(ctx context.Context, msgData *sdkws.MsgData) error } else { msg.Msgs = m } + log.ZDebug(ctx, "PushMessage", "msg", msg) data, err := proto.Marshal(&msg) if err != nil { return err From 420cc4535a3e188e7c331fb144bee93749ba85a2 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 28 Jun 2023 15:23:12 +0800 Subject: [PATCH 6/6] grpc --- pkg/common/mw/rpc_client_interceptor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/common/mw/rpc_client_interceptor.go b/pkg/common/mw/rpc_client_interceptor.go index 554ae0a8f..b7791ada6 100644 --- a/pkg/common/mw/rpc_client_interceptor.go +++ b/pkg/common/mw/rpc_client_interceptor.go @@ -31,10 +31,10 @@ func RpcClientInterceptor(ctx context.Context, method string, req, resp interfac log.ZDebug(ctx, "get rpc ctx success", "conn target", cc.Target()) err = invoker(ctx, method, req, resp, cc, opts...) if err == nil { - // log.ZInfo(ctx, "rpc client resp", "funcName", method, "resp", rpcString(resp)) + log.ZInfo(ctx, "rpc client resp", "funcName", method, "resp", rpcString(resp)) return nil } - // log.ZError(ctx, "rpc resp error", err) + log.ZError(ctx, "rpc resp error", err) rpcErr, ok := err.(interface{ GRPCStatus() *status.Status }) if !ok { return errs.ErrInternalServer.Wrap(err.Error())