From 9625cc57be333ec1a4e1fc6a551368bee66cd63a Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 20 Dec 2024 18:29:57 +0800 Subject: [PATCH] rpc client --- go.mod | 5 ++ internal/rpc/conversation/conversation.go | 57 ++++++----------- internal/rpc/group/group.go | 29 +++------ internal/rpc/group/notification.go | 4 ++ internal/rpc/msg/clear.go | 78 +---------------------- internal/rpc/msg/delete.go | 26 ++------ internal/rpc/msg/send.go | 21 +----- internal/rpc/msg/server.go | 64 ++++++++++--------- internal/rpc/msg/stream_msg.go | 7 +- internal/rpc/user/user.go | 8 ++- pkg/rpcclient/msg.go | 5 +- pkg/rpcli/auth.go | 11 ++++ pkg/rpcli/conversation.go | 48 ++++++++++++++ pkg/rpcli/group.go | 28 ++++++++ pkg/rpcli/msg.go | 54 ++++++++++++++++ pkg/rpcli/msggateway.go | 13 ++++ pkg/rpcli/push.go | 13 ++++ pkg/rpcli/relation.go | 11 ++++ pkg/rpcli/rtc.go | 13 ++++ pkg/rpcli/third.go | 11 ++++ pkg/rpcli/tool.go | 32 ++++++++++ pkg/rpcli/user.go | 11 ++++ 22 files changed, 335 insertions(+), 214 deletions(-) create mode 100644 pkg/rpcli/auth.go create mode 100644 pkg/rpcli/conversation.go create mode 100644 pkg/rpcli/group.go create mode 100644 pkg/rpcli/msg.go create mode 100644 pkg/rpcli/msggateway.go create mode 100644 pkg/rpcli/push.go create mode 100644 pkg/rpcli/relation.go create mode 100644 pkg/rpcli/rtc.go create mode 100644 pkg/rpcli/third.go create mode 100644 pkg/rpcli/tool.go create mode 100644 pkg/rpcli/user.go diff --git a/go.mod b/go.mod index 6c1d421c8..9379af790 100644 --- a/go.mod +++ b/go.mod @@ -221,3 +221,8 @@ require ( golang.org/x/crypto v0.27.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) + + +replace ( + github.com/openimsdk/protocol => /Users/chao/Desktop/code/protocol +) \ No newline at end of file diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index 696ada152..24d0a08e0 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -16,6 +16,7 @@ package conversation import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "sort" "time" @@ -26,9 +27,6 @@ import ( dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" - pbgroup "github.com/openimsdk/protocol/group" - pbmsg "github.com/openimsdk/protocol/msg" - "github.com/openimsdk/protocol/rpccall" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" @@ -52,6 +50,9 @@ type conversationServer struct { conversationNotificationSender *ConversationNotificationSender config *Config + // todo + msgClient *rpcli.MsgClient + groupClient *rpcli.GroupClient } type Config struct { @@ -118,19 +119,12 @@ func (c *conversationServer) GetSortedConversationList(ctx context.Context, req if len(conversations) == 0 { return nil, errs.ErrRecordNotFound.Wrap() } - - maxSeqs, err := rpccall.ExtractField(ctx, pbmsg.GetMaxSeqsCaller.Invoke, - &pbmsg.GetMaxSeqsReq{ConversationIDs: conversationIDs}, - (*pbmsg.SeqsInfoResp).GetMaxSeqs) + maxSeqs, err := c.msgClient.GetMaxSeqs(ctx, conversationIDs) if err != nil { return nil, err } - chatLogs, err := rpccall.ExtractField(ctx, pbmsg.GetMsgByConversationIDsCaller.Invoke, - &pbmsg.GetMsgByConversationIDsReq{ - ConversationIDs: conversationIDs, - MaxSeqs: maxSeqs, - }, (*pbmsg.GetMsgByConversationIDsResp).GetMsgDatas) + chatLogs, err := c.msgClient.GetMsgByConversationIDs(ctx, conversationIDs, maxSeqs) if err != nil { return nil, err } @@ -140,9 +134,7 @@ func (c *conversationServer) GetSortedConversationList(ctx context.Context, req return nil, err } - hasReadSeqs, err := rpccall.ExtractField(ctx, pbmsg.GetHasReadSeqsCaller.Invoke, - &pbmsg.GetHasReadSeqsReq{ConversationIDs: conversationIDs}, - (*pbmsg.SeqsInfoResp).GetMaxSeqs) + hasReadSeqs, err := c.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.UserID) if err != nil { return nil, err } @@ -230,14 +222,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver return nil, errs.ErrArgs.WrapMsg("conversation must not be nil") } if req.Conversation.ConversationType == constant.WriteGroupChatType { - groupInfo, err := rpccall.ExtractField(ctx, pbgroup.GetGroupsInfoCaller.Invoke, - &pbgroup.GetGroupsInfoReq{GroupIDs: []string{req.Conversation.GroupID}}, - func(r *pbgroup.GetGroupsInfoResp) *sdkws.GroupInfo { - if len(r.GroupInfos) > 0 { - return r.GroupInfos[0] - } - return nil - }) + groupInfo, err := c.groupClient.GetGroupInfo(ctx, req.Conversation.GroupID) if err != nil { return nil, err } @@ -444,14 +429,14 @@ func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, r return nil, err } conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID) - if _, err := pbmsg.SetUserConversationMaxSeqCaller.Invoke(ctx, &pbmsg.SetUserConversationMaxSeqReq{ConversationID: conversationID, OwnerUserID: req.UserIDs, MaxSeq: 0}); err != nil { + if err := c.msgClient.SetUserConversationMaxSeq(ctx, conversationID, req.UserIDs, 0); err != nil { return nil, err } return &pbconversation.CreateGroupChatConversationsResp{}, nil } func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbconversation.SetConversationMaxSeqReq) (*pbconversation.SetConversationMaxSeqResp, error) { - if _, err := pbmsg.SetUserConversationMaxSeqCaller.Invoke(ctx, &pbmsg.SetUserConversationMaxSeqReq{ConversationID: req.ConversationID, OwnerUserID: req.OwnerUserID, MaxSeq: req.MaxSeq}); err != nil { + if err := c.msgClient.SetUserConversationMaxSeq(ctx, req.ConversationID, req.OwnerUserID, req.MaxSeq); err != nil { return nil, err } if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID, @@ -465,7 +450,7 @@ func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbc } func (c *conversationServer) SetConversationMinSeq(ctx context.Context, req *pbconversation.SetConversationMinSeqReq) (*pbconversation.SetConversationMinSeqResp, error) { - if _, err := pbmsg.SetUserConversationMinSeqCaller.Invoke(ctx, &pbmsg.SetUserConversationMinSeqReq{ConversationID: req.ConversationID, OwnerUserID: req.OwnerUserID, MinSeq: req.MinSeq}); err != nil { + if err := c.msgClient.SetUserConversationMin(ctx, req.ConversationID, req.OwnerUserID, req.MinSeq); err != nil { return nil, err } if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID, @@ -584,9 +569,7 @@ func (c *conversationServer) getConversationInfo( } } if len(groupIDs) != 0 { - groupInfos, err := rpccall.ExtractField(ctx, pbgroup.GetGroupsInfoCaller.Invoke, - &pbgroup.GetGroupsInfoReq{GroupIDs: groupIDs}, - (*pbgroup.GetGroupsInfoResp).GetGroupInfos) + groupInfos, err := c.groupClient.GetGroupsInfo(ctx, groupIDs) if err != nil { return nil, err } @@ -774,23 +757,22 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req * if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 { continue } - rcpReq := &pbmsg.GetLastMessageSeqByTimeReq{ConversationID: conversation.ConversationID, Time: req.Timestamp - conversation.MsgDestructTime} - resp, err := pbmsg.GetLastMessageSeqByTime.Invoke(ctx, rcpReq) + seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-conversation.MsgDestructTime) if err != nil { return nil, err } - if resp.Seq <= 0 { - log.ZDebug(ctx, "ClearUserConversationMsg GetLastMessageSeqByTime seq <= 0", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "msgDestructTime", conversation.MsgDestructTime, "seq", resp.Seq) + if seq <= 0 { + log.ZDebug(ctx, "ClearUserConversationMsg GetLastMessageSeqByTime seq <= 0", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "msgDestructTime", conversation.MsgDestructTime, "seq", seq) if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, -1, latestMsgDestructTime); err != nil { return nil, err } continue } - resp.Seq++ - if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, resp.Seq, latestMsgDestructTime); err != nil { + seq++ + if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, seq, latestMsgDestructTime); err != nil { return nil, err } - log.ZDebug(ctx, "ClearUserConversationMsg set min seq", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "seq", resp.Seq, "msgDestructTime", conversation.MsgDestructTime) + log.ZDebug(ctx, "ClearUserConversationMsg set min seq", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "seq", seq, "msgDestructTime", conversation.MsgDestructTime) } return &pbconversation.ClearUserConversationMsgResp{Count: int32(len(conversations))}, nil } @@ -800,8 +782,7 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c "latest_msg_destruct_time": latestMsgDestructTime, } if minSeq >= 0 { - req := &pbmsg.SetUserConversationMinSeqReq{ConversationID: conversationID, OwnerUserID: []string{ownerUserID}, MinSeq: minSeq} - if _, err := pbmsg.SetUserConversationMinSeqCaller.Invoke(ctx, req); err != nil { + if err := c.msgClient.SetUserConversationMin(ctx, conversationID, []string{ownerUserID}, minSeq); err != nil { return err } update["min_seq"] = minSeq diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 8af09b4c5..131cb461b 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -17,6 +17,7 @@ package group import ( "context" "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "math/big" "math/rand" "strconv" @@ -41,8 +42,6 @@ import ( "github.com/openimsdk/protocol/constant" pbconv "github.com/openimsdk/protocol/conversation" pbgroup "github.com/openimsdk/protocol/group" - "github.com/openimsdk/protocol/msg" - "github.com/openimsdk/protocol/rpccall" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/wrapperspb" "github.com/openimsdk/tools/db/mongoutil" @@ -63,6 +62,9 @@ type groupServer struct { notification *GroupNotificationSender config *Config webhookClient *webhook.Client + // todo + msgClient *rpcli.MsgClient + conversationClient *rpcli.ConversationClient } type Config struct { @@ -950,18 +952,11 @@ func (g *groupServer) QuitGroup(ctx context.Context, req *pbgroup.QuitGroupReq) func (g *groupServer) deleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error { conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID) - maxSeq, err := rpccall.ExtractField(ctx, msg.GetConversationMaxSeqCaller.Invoke, - &msg.GetConversationMaxSeqReq{ConversationID: conversationID}, - (*msg.GetConversationMaxSeqResp).GetMaxSeq) + maxSeq, err := g.msgClient.GetConversationMaxSeq(ctx, conversationID) if err != nil { return err } - - return pbconv.SetConversationMaxSeqCaller.Execute(ctx, &pbconv.SetConversationMaxSeqReq{ - ConversationID: conversationID, - OwnerUserID: userIDs, - MaxSeq: maxSeq, - }) + return g.conversationClient.SetConversationMaxSeq(ctx, conversationID, userIDs, maxSeq) } func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInfoReq) (*pbgroup.SetGroupInfoResp, error) { @@ -1037,11 +1032,7 @@ func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf return } conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification} - - if err := pbconv.SetConversationsCaller.Execute(ctx, &pbconv.SetConversationsReq{ - UserIDs: resp.UserIDs, - Conversation: conversation, - }); err != nil { + if err := g.conversationClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil { log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation) } }() @@ -1154,11 +1145,7 @@ func (g *groupServer) SetGroupInfoEx(ctx context.Context, req *pbgroup.SetGroupI } conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification} - - if err := pbconv.SetConversationsCaller.Execute(ctx, &pbconv.SetConversationsReq{ - UserIDs: resp.UserIDs, - Conversation: conversation, - }); err != nil { + if err := g.conversationClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil { log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation) } }() diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index 784ec8943..4a9be4ec0 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -518,9 +518,13 @@ func (g *GroupNotificationSender) GroupApplicationAgreeMemberEnterNotification(c if !g.config.RpcConfig.EnableHistoryForNewMembers { conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID) + maxSeq, err := rpccall.ExtractField(ctx, msg.GetConversationMaxSeqCaller.Invoke, &msg.GetConversationMaxSeqReq{ConversationID: conversationID}, (*msg.GetConversationMaxSeqResp).GetMaxSeq) + maxSeq,err := g. + + if err != nil { return err } diff --git a/internal/rpc/msg/clear.go b/internal/rpc/msg/clear.go index 7a2d36300..8e14b281e 100644 --- a/internal/rpc/msg/clear.go +++ b/internal/rpc/msg/clear.go @@ -2,23 +2,13 @@ package msg import ( "context" - "strings" - "time" - "github.com/openimsdk/open-im-server/v3/pkg/authverify" - "github.com/openimsdk/open-im-server/v3/pkg/common/convert" - pbconv "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" - "github.com/openimsdk/protocol/wrapperspb" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mcontext" - "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/utils/idutil" - "github.com/openimsdk/tools/utils/stringutil" - "golang.org/x/sync/errgroup" + "strings" ) -// hard delete in Database. +// DestructMsgs hard delete in Database. func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (*msg.DestructMsgsResp, error) { if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil { return nil, err @@ -61,70 +51,6 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) return &msg.DestructMsgsResp{Count: int32(len(docs))}, nil } -// soft delete for user self -func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.ClearMsgResp, error) { - temp := convert.ConversationsPb2DB(req.Conversations) - - batchNum := 100 - - errg, _ := errgroup.WithContext(ctx) - errg.SetLimit(100) - - for i := 0; i < len(temp); i += batchNum { - batch := temp[i:min(i+batchNum, len(temp))] - - errg.Go(func() error { - for _, conversation := range batch { - handleCtx := mcontext.NewCtx(stringutil.GetSelfFuncName() + "-" + idutil.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID) - log.ZDebug(handleCtx, "User MsgsDestruct", - "conversationID", conversation.ConversationID, - "ownerUserID", conversation.OwnerUserID, - "msgDestructTime", conversation.MsgDestructTime, - "lastMsgDestructTime", conversation.LatestMsgDestructTime) - - seqs, err := m.MsgDatabase.ClearUserMsgs(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) - if err != nil { - log.ZError(handleCtx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) - continue - } - - if len(seqs) > 0 { - minseq := datautil.Max(seqs...) - - // update - if err := pbconv.UpdateConversationCaller.Execute(ctx, &pbconv.UpdateConversationReq{ - ConversationID: conversation.ConversationID, - UserIDs: []string{conversation.OwnerUserID}, - MinSeq: wrapperspb.Int64(minseq), - LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()), - }); err != nil { - log.ZError(handleCtx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) - continue - } - - if err := pbconv.SetConversationMinSeqCaller.Execute(ctx, &pbconv.SetConversationMinSeqReq{ - ConversationID: conversation.ConversationID, - OwnerUserID: []string{conversation.OwnerUserID}, - MinSeq: minseq, - }); err != nil { - return err - } - - // if you need Notify SDK client userseq is update. - // m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs) - } - } - return nil - }) - } - - if err := errg.Wait(); err != nil { - return nil, err - } - - return nil, nil -} - func (m *msgServer) GetLastMessageSeqByTime(ctx context.Context, req *msg.GetLastMessageSeqByTimeReq) (*msg.GetLastMessageSeqByTimeResp, error) { seq, err := m.MsgDatabase.GetLastMessageSeqByTime(ctx, req.ConversationID, req.Time) if err != nil { diff --git a/internal/rpc/msg/delete.go b/internal/rpc/msg/delete.go index e0c3a89ed..f45713ff1 100644 --- a/internal/rpc/msg/delete.go +++ b/internal/rpc/msg/delete.go @@ -21,7 +21,6 @@ import ( "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" - "github.com/openimsdk/protocol/rpccall" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/timeutil" @@ -75,22 +74,13 @@ func (m *msgServer) DeleteMsgs(ctx context.Context, req *msg.DeleteMsgsReq) (*ms if err := m.MsgDatabase.DeleteMsgsPhysicalBySeqs(ctx, req.ConversationID, req.Seqs); err != nil { return nil, err } - - conversations, err := rpccall.ExtractField(ctx, conversation.GetConversationsByConversationIDCaller.Invoke, &conversation.GetConversationsByConversationIDReq{ - ConversationIDs: []string{req.ConversationID}, - }, (*conversation.GetConversationsByConversationIDResp).GetConversations) + conv, err := m.conversationClient.GetConversationsByConversationID(ctx, req.ConversationID) if err != nil { return nil, err } tips := &sdkws.DeleteMsgsTips{UserID: req.UserID, ConversationID: req.ConversationID, Seqs: req.Seqs} - m.notificationSender.NotificationWithSessionType( - ctx, - req.UserID, - m.conversationAndGetRecvID(conversations[0], req.UserID), - constant.DeleteMsgsNotification, - conversations[0].ConversationType, - tips, - ) + m.notificationSender.NotificationWithSessionType(ctx, req.UserID, m.conversationAndGetRecvID(conv, req.UserID), + constant.DeleteMsgsNotification, conv.ConversationType, tips) } else { if err := m.MsgDatabase.DeleteUserMsgsBySeqs(ctx, req.UserID, req.ConversationID, req.Seqs); err != nil { return nil, err @@ -125,9 +115,7 @@ func (m *msgServer) DeleteMsgPhysical(ctx context.Context, req *msg.DeleteMsgPhy } func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []string, userID string, deleteSyncOpt *msg.DeleteSyncOpt) error { - conversations, err := rpccall.ExtractField(ctx, conversation.GetConversationsByConversationIDCaller.Invoke, &conversation.GetConversationsByConversationIDReq{ - ConversationIDs: conversationIDs, - }, (*conversation.GetConversationsByConversationIDResp).GetConversations) + conversations, err := m.conversationClient.GetConversationsByConversationIDs(ctx, conversationIDs) if err != nil { return err } @@ -150,11 +138,7 @@ func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []str } ownerUserIDs := []string{userID} for conversationID, seq := range setSeqs { - if err := conversation.SetConversationMinSeqCaller.Execute(ctx, &conversation.SetConversationMinSeqReq{ - ConversationID: conversationID, - OwnerUserID: ownerUserIDs, - MinSeq: seq, - }); err != nil { + if err := m.conversationClient.SetConversationMinSeq(ctx, conversationID, ownerUserIDs, seq); err != nil { return err } } diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index bb20206f7..b9bbf615f 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -118,25 +118,14 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll} } else { // @Everyone and @other people conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAllAtMe} - - err = pbconv.SetConversationsCaller.Execute(ctx, &pbconv.SetConversationsReq{ - UserIDs: atUserID, - Conversation: conversation, - }) - if err != nil { + if err := m.conversationClient.SetConversations(ctx, atUserID, conversation); err != nil { log.ZWarn(ctx, "SetConversations", err, "userID", atUserID, "conversation", conversation) } - memberUserIDList = datautil.Single(atUserID, memberUserIDList) } conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll} - - err = pbconv.SetConversationsCaller.Execute(ctx, &pbconv.SetConversationsReq{ - UserIDs: memberUserIDList, - Conversation: conversation, - }) - if err != nil { + if err := m.conversationClient.SetConversations(ctx, memberUserIDList, conversation); err != nil { log.ZWarn(ctx, "SetConversations", err, "userID", memberUserIDList, "conversation", conversation) } @@ -144,11 +133,7 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa } conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtMe} - err := pbconv.SetConversationsCaller.Execute(ctx, &pbconv.SetConversationsReq{ - UserIDs: msg.AtUserIDList, - Conversation: conversation, - }) - if err != nil { + if err := m.conversationClient.SetConversations(ctx, msg.AtUserIDList, conversation); err != nil { log.ZWarn(ctx, "SetConversations", err, msg.AtUserIDList, conversation) } } diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 9bd2a6946..29219864a 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -16,6 +16,7 @@ package msg import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" @@ -36,39 +37,40 @@ import ( ) type MessageInterceptorFunc func(ctx context.Context, globalConfig *Config, req *msg.SendMsgReq) (*sdkws.MsgData, error) -type ( - // MessageInterceptorChain defines a chain of message interceptor functions. - MessageInterceptorChain []MessageInterceptorFunc - // MsgServer encapsulates dependencies required for message handling. - msgServer struct { - RegisterCenter discovery.SvcDiscoveryRegistry // Service discovery registry for service registration. - MsgDatabase controller.CommonMsgDatabase // Interface for message database operations. - StreamMsgDatabase controller.StreamMsgDatabase - UserLocalCache *rpccache.UserLocalCache // Local cache for user data. - FriendLocalCache *rpccache.FriendLocalCache // Local cache for friend data. - GroupLocalCache *rpccache.GroupLocalCache // Local cache for group data. - ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data. - Handlers MessageInterceptorChain // Chain of handlers for processing messages. - notificationSender *rpcclient.NotificationSender // RPC client for sending notifications. - msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications. - config *Config // Global configuration settings. - webhookClient *webhook.Client - msg.UnimplementedMsgServer - } +// MessageInterceptorChain defines a chain of message interceptor functions. +type MessageInterceptorChain []MessageInterceptorFunc - Config struct { - RpcConfig config.Msg - RedisConfig config.Redis - MongodbConfig config.Mongo - KafkaConfig config.Kafka - NotificationConfig config.Notification - Share config.Share - WebhooksConfig config.Webhooks - LocalCacheConfig config.LocalCache - Discovery config.Discovery - } -) +type Config struct { + RpcConfig config.Msg + RedisConfig config.Redis + MongodbConfig config.Mongo + KafkaConfig config.Kafka + NotificationConfig config.Notification + Share config.Share + WebhooksConfig config.Webhooks + LocalCacheConfig config.LocalCache + Discovery config.Discovery +} + +// MsgServer encapsulates dependencies required for message handling. +type msgServer struct { + RegisterCenter discovery.SvcDiscoveryRegistry // Service discovery registry for service registration. + MsgDatabase controller.CommonMsgDatabase // Interface for message database operations. + StreamMsgDatabase controller.StreamMsgDatabase + UserLocalCache *rpccache.UserLocalCache // Local cache for user data. + FriendLocalCache *rpccache.FriendLocalCache // Local cache for friend data. + GroupLocalCache *rpccache.GroupLocalCache // Local cache for group data. + ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data. + Handlers MessageInterceptorChain // Chain of handlers for processing messages. + notificationSender *rpcclient.NotificationSender // RPC client for sending notifications. + msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications. + config *Config // Global configuration settings. + webhookClient *webhook.Client + msg.UnimplementedMsgServer + // todo + conversationClient rpcli.ConversationClient +} func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) { m.Handlers = append(m.Handlers, interceptorFunc...) diff --git a/internal/rpc/msg/stream_msg.go b/internal/rpc/msg/stream_msg.go index e216b1087..688d766c8 100644 --- a/internal/rpc/msg/stream_msg.go +++ b/internal/rpc/msg/stream_msg.go @@ -8,9 +8,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/protocol/constant" - pbconv "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" - "github.com/openimsdk/protocol/rpccall" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" ) @@ -74,10 +72,7 @@ func (m *msgServer) AppendStreamMsg(ctx context.Context, req *msg.AppendStreamMs if err := m.StreamMsgDatabase.AppendStreamMsg(ctx, req.ClientMsgID, int(req.StartIndex), req.Packets, req.End, deadlineTime); err != nil { return nil, err } - conversation, err := rpccall.ExtractField(ctx, pbconv.GetConversationCaller.Invoke, &pbconv.GetConversationReq{ - ConversationID: res.ConversationID, - OwnerUserID: res.UserID, - }, (*pbconv.GetConversationResp).GetConversation) + conversation, err := m.conversationClient.GetConversation(ctx, res.ConversationID, res.UserID) if err != nil { return nil, err } diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index c2bcabaa1..8c3a54f66 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -17,6 +17,7 @@ package user import ( "context" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "math/rand" "strings" "sync" @@ -59,6 +60,9 @@ type userServer struct { RegisterCenter registry.SvcDiscoveryRegistry config *Config webhookClient *webhook.Client + // todo + groupClient *rpcli.GroupClient + relationClient *rpcli.RelationClient } type Config struct { @@ -633,7 +637,7 @@ func (s *userServer) NotificationUserInfoUpdate(ctx context.Context, userID stri wg.Add(len(es)) go func() { defer wg.Done() - _, es[0] = group.NotificationUserInfoUpdateCaller.Invoke(ctx, &group.NotificationUserInfoUpdateReq{ + _, es[0] = s.groupClient.NotificationUserInfoUpdate(ctx, &group.NotificationUserInfoUpdateReq{ UserID: userID, OldUserInfo: oldUserInfo, NewUserInfo: newUserInfo, @@ -642,7 +646,7 @@ func (s *userServer) NotificationUserInfoUpdate(ctx context.Context, userID stri go func() { defer wg.Done() - _, es[1] = friendpb.NotificationUserInfoUpdateCaller.Invoke(ctx, &friendpb.NotificationUserInfoUpdateReq{ + _, es[1] = s.relationClient.NotificationUserInfoUpdate(ctx, &friendpb.NotificationUserInfoUpdateReq{ UserID: userID, OldUserInfo: oldUserInfo, NewUserInfo: newUserInfo, diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index ca9b8fc68..f0f0be40f 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -17,6 +17,7 @@ package rpcclient import ( "context" "encoding/json" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "time" "google.golang.org/protobuf/proto" @@ -132,6 +133,8 @@ type NotificationSender struct { sendMsg func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) getUserInfo func(ctx context.Context, userID string) (*sdkws.UserInfo, error) queue *memamq.MemoryQueue + // todo + msgClient *rpcli.MsgClient } func WithQueue(queue *memamq.MemoryQueue) NotificationSenderOptions { @@ -151,7 +154,7 @@ func WithLocalSendMsg(sendMsg func(ctx context.Context, req *msg.SendMsgReq) (*m func WithRpcClient() NotificationSenderOptions { return func(s *NotificationSender) { s.sendMsg = func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { - return msg.SendMsgCaller.Invoke(ctx, req) + return s.msgClient.SendMsg(ctx, req) } } } diff --git a/pkg/rpcli/auth.go b/pkg/rpcli/auth.go new file mode 100644 index 000000000..a159eb955 --- /dev/null +++ b/pkg/rpcli/auth.go @@ -0,0 +1,11 @@ +package rpcli + +import "github.com/openimsdk/protocol/auth" + +func NewAuthClient(cli auth.AuthClient) *AuthClient { + return &AuthClient{cli} +} + +type AuthClient struct { + auth.AuthClient +} diff --git a/pkg/rpcli/conversation.go b/pkg/rpcli/conversation.go new file mode 100644 index 000000000..c099500cf --- /dev/null +++ b/pkg/rpcli/conversation.go @@ -0,0 +1,48 @@ +package rpcli + +import ( + "context" + "github.com/openimsdk/protocol/conversation" +) + +func NewConversationClient(cli conversation.ConversationClient) *ConversationClient { + return &ConversationClient{cli} +} + +type ConversationClient struct { + conversation.ConversationClient +} + +func (x *ConversationClient) SetConversationMaxSeq(ctx context.Context, conversationID string, ownerUserIDs []string, maxSeq int64) error { + req := &conversation.SetConversationMaxSeqReq{ConversationID: conversationID, OwnerUserID: ownerUserIDs, MaxSeq: maxSeq} + return ignoreResp(x.ConversationClient.SetConversationMaxSeq(ctx, req)) +} + +func (x *ConversationClient) SetConversations(ctx context.Context, userIDs []string, info *conversation.ConversationReq) error { + req := &conversation.SetConversationsReq{UserIDs: userIDs, Conversation: info} + return ignoreResp(x.ConversationClient.SetConversations(ctx, req)) +} + +func (x *ConversationClient) GetConversationsByConversationIDs(ctx context.Context, conversationIDs []string) ([]*conversation.Conversation, error) { + req := &conversation.GetConversationsByConversationIDReq{ConversationIDs: conversationIDs} + return extractField(ctx, x.ConversationClient.GetConversationsByConversationID, req, (*conversation.GetConversationsByConversationIDResp).GetConversations) +} + +func (x *ConversationClient) GetConversationsByConversationID(ctx context.Context, conversationID string) (*conversation.Conversation, error) { + return firstValue(x.GetConversationsByConversationIDs(ctx, []string{conversationID})) +} + +func (x *ConversationClient) SetConversationMinSeq(ctx context.Context, conversationID string, ownerUserIDs []string, minSeq int64) error { + req := &conversation.SetConversationMinSeqReq{ConversationID: conversationID, OwnerUserID: ownerUserIDs, MinSeq: minSeq} + return ignoreResp(x.ConversationClient.SetConversationMinSeq(ctx, req)) +} + +func (x *ConversationClient) GetConversation(ctx context.Context, conversationID string, ownerUserID string) (*conversation.Conversation, error) { + req := &conversation.GetConversationReq{ConversationID: conversationID, OwnerUserID: ownerUserID} + return extractField(ctx, x.ConversationClient.GetConversation, req, (*conversation.GetConversationResp).GetConversation) +} + +func (x *ConversationClient) GetConversations(ctx context.Context, conversationIDs []string, ownerUserID string) ([]*conversation.Conversation, error) { + req := &conversation.GetConversationsReq{ConversationIDs: conversationIDs, OwnerUserID: ownerUserID} + return extractField(ctx, x.ConversationClient.GetConversations, req, (*conversation.GetConversationsResp).GetConversations) +} diff --git a/pkg/rpcli/group.go b/pkg/rpcli/group.go new file mode 100644 index 000000000..763a92dfd --- /dev/null +++ b/pkg/rpcli/group.go @@ -0,0 +1,28 @@ +package rpcli + +import ( + "context" + "github.com/openimsdk/protocol/group" + "github.com/openimsdk/protocol/sdkws" +) + +func NewGroupClient(cli group.GroupClient) *GroupClient { + return &GroupClient{cli} +} + +type GroupClient struct { + group.GroupClient +} + +func (x *GroupClient) cli() group.GroupClient { + return x.GroupClient +} + +func (x *GroupClient) GetGroupsInfo(ctx context.Context, groupIDs []string) ([]*sdkws.GroupInfo, error) { + req := &group.GetGroupsInfoReq{GroupIDs: groupIDs} + return extractField(ctx, x.cli().GetGroupsInfo, req, (*group.GetGroupsInfoResp).GetGroupInfos) +} + +func (x *GroupClient) GetGroupInfo(ctx context.Context, groupID string) (*sdkws.GroupInfo, error) { + return firstValue(x.GetGroupsInfo(ctx, []string{groupID})) +} diff --git a/pkg/rpcli/msg.go b/pkg/rpcli/msg.go new file mode 100644 index 000000000..513ed80af --- /dev/null +++ b/pkg/rpcli/msg.go @@ -0,0 +1,54 @@ +package rpcli + +import ( + "context" + "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/protocol/sdkws" +) + +func NewMsgClient(cli msg.MsgClient) *MsgClient { + return &MsgClient{cli} +} + +type MsgClient struct { + msg.MsgClient +} + +func (x *MsgClient) cli() msg.MsgClient { + return x.MsgClient +} + +func (x *MsgClient) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { + req := &msg.GetMaxSeqsReq{ConversationIDs: conversationIDs} + return extractField(ctx, x.cli().GetMaxSeqs, req, (*msg.SeqsInfoResp).GetMaxSeqs) +} + +func (x *MsgClient) GetMsgByConversationIDs(ctx context.Context, conversationIDs []string, maxSeqs map[string]int64) (map[string]*sdkws.MsgData, error) { + req := &msg.GetMsgByConversationIDsReq{ConversationIDs: conversationIDs, MaxSeqs: maxSeqs} + return extractField(ctx, x.cli().GetMsgByConversationIDs, req, (*msg.GetMsgByConversationIDsResp).GetMsgDatas) +} + +func (x *MsgClient) GetHasReadSeqs(ctx context.Context, conversationIDs []string, userID string) (map[string]int64, error) { + req := &msg.GetHasReadSeqsReq{ConversationIDs: conversationIDs, UserID: userID} + return extractField(ctx, x.cli().GetHasReadSeqs, req, (*msg.SeqsInfoResp).GetMaxSeqs) +} + +func (x *MsgClient) SetUserConversationMaxSeq(ctx context.Context, conversationID string, ownerUserIDs []string, maxSeq int64) error { + req := &msg.SetUserConversationMaxSeqReq{ConversationID: conversationID, OwnerUserID: ownerUserIDs, MaxSeq: maxSeq} + return ignoreResp(x.cli().SetUserConversationMaxSeq(ctx, req)) +} + +func (x *MsgClient) SetUserConversationMin(ctx context.Context, conversationID string, ownerUserIDs []string, minSeq int64) error { + req := &msg.SetUserConversationsMinSeqReq{ConversationID: conversationID, UserIDs: ownerUserIDs, Seq: minSeq} + return ignoreResp(x.cli().SetUserConversationsMinSeq(ctx, req)) +} + +func (x *MsgClient) GetLastMessageSeqByTime(ctx context.Context, conversationID string, lastTime int64) (int64, error) { + req := &msg.GetLastMessageSeqByTimeReq{ConversationID: conversationID, Time: lastTime} + return extractField(ctx, x.cli().GetLastMessageSeqByTime, req, (*msg.GetLastMessageSeqByTimeResp).GetSeq) +} + +func (x *MsgClient) GetConversationMaxSeq(ctx context.Context, conversationID string) (int64, error) { + req := &msg.GetConversationMaxSeqReq{ConversationID: conversationID} + return extractField(ctx, x.cli().GetConversationMaxSeq, req, (*msg.GetConversationMaxSeqResp).GetMaxSeq) +} diff --git a/pkg/rpcli/msggateway.go b/pkg/rpcli/msggateway.go new file mode 100644 index 000000000..2b25b50e9 --- /dev/null +++ b/pkg/rpcli/msggateway.go @@ -0,0 +1,13 @@ +package rpcli + +import ( + "github.com/openimsdk/protocol/msggateway" +) + +func NewMsgGatewayClient(cli msggateway.MsgGatewayClient) *MsgGatewayClient { + return &MsgGatewayClient{cli} +} + +type MsgGatewayClient struct { + msggateway.MsgGatewayClient +} diff --git a/pkg/rpcli/push.go b/pkg/rpcli/push.go new file mode 100644 index 000000000..7e928daa5 --- /dev/null +++ b/pkg/rpcli/push.go @@ -0,0 +1,13 @@ +package rpcli + +import ( + "github.com/openimsdk/protocol/push" +) + +func NewPushMsgServiceClient(cli push.PushMsgServiceClient) *PushMsgServiceClient { + return &PushMsgServiceClient{cli} +} + +type PushMsgServiceClient struct { + push.PushMsgServiceClient +} diff --git a/pkg/rpcli/relation.go b/pkg/rpcli/relation.go new file mode 100644 index 000000000..02a591b84 --- /dev/null +++ b/pkg/rpcli/relation.go @@ -0,0 +1,11 @@ +package rpcli + +import "github.com/openimsdk/protocol/relation" + +func NewRelationClient(cli relation.FriendClient) *RelationClient { + return &RelationClient{cli} +} + +type RelationClient struct { + relation.FriendClient +} diff --git a/pkg/rpcli/rtc.go b/pkg/rpcli/rtc.go new file mode 100644 index 000000000..1c9973619 --- /dev/null +++ b/pkg/rpcli/rtc.go @@ -0,0 +1,13 @@ +package rpcli + +import ( + "github.com/openimsdk/protocol/rtc" +) + +func NewRtcServiceClient(cli rtc.RtcServiceClient) *RtcServiceClient { + return &RtcServiceClient{cli} +} + +type RtcServiceClient struct { + rtc.RtcServiceClient +} diff --git a/pkg/rpcli/third.go b/pkg/rpcli/third.go new file mode 100644 index 000000000..7ac0e84a7 --- /dev/null +++ b/pkg/rpcli/third.go @@ -0,0 +1,11 @@ +package rpcli + +import "github.com/openimsdk/protocol/third" + +func NewThirdClient(cli third.ThirdClient) *ThirdClient { + return &ThirdClient{cli} +} + +type ThirdClient struct { + third.ThirdClient +} diff --git a/pkg/rpcli/tool.go b/pkg/rpcli/tool.go new file mode 100644 index 000000000..2bd50bd00 --- /dev/null +++ b/pkg/rpcli/tool.go @@ -0,0 +1,32 @@ +package rpcli + +import ( + "context" + "github.com/openimsdk/tools/errs" + "google.golang.org/grpc" +) + +func extractField[A, B, C any](ctx context.Context, fn func(ctx context.Context, req *A, opts ...grpc.CallOption) (*B, error), req *A, get func(*B) C) (C, error) { + resp, err := fn(ctx, req) + if err != nil { + var c C + return c, err + } + return get(resp), nil +} + +func firstValue[A any](val []A, err error) (A, error) { + if err != nil { + var a A + return a, err + } + if len(val) == 0 { + var a A + return a, errs.ErrRecordNotFound.WrapMsg("record not found") + } + return val[0], nil +} + +func ignoreResp(_ any, err error) error { + return err +} diff --git a/pkg/rpcli/user.go b/pkg/rpcli/user.go new file mode 100644 index 000000000..c9a72e7e3 --- /dev/null +++ b/pkg/rpcli/user.go @@ -0,0 +1,11 @@ +package rpcli + +import "github.com/openimsdk/protocol/user" + +func NewUserClient(cli user.UserClient) *UserClient { + return &UserClient{cli} +} + +type UserClient struct { + user.UserClient +}