From a11dd60c28c3d468eb7e495fef634cef906e9ba0 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Mon, 23 Dec 2024 16:35:58 +0800 Subject: [PATCH] rpc client --- internal/api/jssdk/jssdk.go | 64 +++--- internal/msggateway/init.go | 5 +- internal/msgtransfer/init.go | 7 +- .../msgtransfer/online_history_msg_handler.go | 42 ++-- internal/push/push.go | 7 +- internal/push/push_handler.go | 58 +++--- internal/rpc/auth/auth.go | 19 +- internal/rpc/conversation/conversation.go | 24 ++- internal/rpc/conversation/notification.go | 8 +- internal/rpc/group/group.go | 78 ++++---- internal/rpc/group/notification.go | 142 ++++++------- internal/rpc/msg/server.go | 31 ++- internal/rpc/relation/black.go | 24 ++- internal/rpc/relation/friend.go | 32 ++- internal/rpc/relation/notification.go | 11 +- internal/rpc/third/log.go | 6 +- internal/rpc/third/third.go | 7 + internal/rpc/user/notification.go | 8 +- internal/rpc/user/user.go | 9 +- pkg/common/convert/group.go | 13 +- pkg/common/startrpc/start.go | 5 - pkg/rpccache/conversation.go | 38 ++-- pkg/rpccache/friend.go | 12 +- pkg/rpccache/group.go | 24 +-- pkg/rpccache/online.go | 35 +--- pkg/rpccache/subscriber.go | 1 - pkg/rpccache/user.go | 12 +- pkg/rpcclient/init.go | 116 +++++------ pkg/rpcclient/msg.go | 8 +- pkg/rpcclient/user.go | 187 +++++++++--------- pkg/rpcli/auth.go | 9 +- pkg/rpcli/conversation.go | 29 ++- pkg/rpcli/group.go | 31 ++- pkg/rpcli/msg.go | 38 ++-- pkg/rpcli/msggateway.go | 5 +- pkg/rpcli/push.go | 5 +- pkg/rpcli/relation.go | 15 +- pkg/rpcli/rtc.go | 5 +- pkg/rpcli/third.go | 9 +- pkg/rpcli/user.go | 68 ++++++- 40 files changed, 707 insertions(+), 540 deletions(-) diff --git a/internal/api/jssdk/jssdk.go b/internal/api/jssdk/jssdk.go index 409fcbf79..036cb027a 100644 --- a/internal/api/jssdk/jssdk.go +++ b/internal/api/jssdk/jssdk.go @@ -2,16 +2,15 @@ package jssdk import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "sort" "github.com/gin-gonic/gin" "github.com/openimsdk/protocol/conversation" - "github.com/openimsdk/protocol/group" "github.com/openimsdk/protocol/jssdk" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/relation" "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/protocol/user" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" ) @@ -26,6 +25,11 @@ func NewJSSdkApi() *JSSdk { } type JSSdk struct { + userClient rpcli.UserClient + relationClient rpcli.RelationClient + groupClient rpcli.GroupClient + conversationClient rpcli.ConversationClient + msgClient rpcli.MsgClient } func (x *JSSdk) GetActiveConversations(c *gin.Context) { @@ -57,11 +61,11 @@ func (x *JSSdk) fillConversations(ctx context.Context, conversations []*jssdk.Co groupMap map[string]*sdkws.GroupInfo ) if len(userIDs) > 0 { - users, err := field(ctx, user.GetDesignateUsersCaller.Invoke, &user.GetDesignateUsersReq{UserIDs: userIDs}, (*user.GetDesignateUsersResp).GetUsersInfo) + users, err := x.userClient.GetUsersInfo(ctx, userIDs) if err != nil { return err } - friends, err := field(ctx, relation.GetFriendInfoCaller.Invoke, &relation.GetFriendInfoReq{OwnerUserID: conversations[0].Conversation.OwnerUserID, FriendUserIDs: userIDs}, (*relation.GetFriendInfoResp).GetFriendInfos) + friends, err := x.relationClient.GetFriendsInfo(ctx, conversations[0].Conversation.OwnerUserID, userIDs) if err != nil { return err } @@ -69,11 +73,11 @@ func (x *JSSdk) fillConversations(ctx context.Context, conversations []*jssdk.Co friendMap = datautil.SliceToMap(friends, (*relation.FriendInfoOnly).GetFriendUserID) } if len(groupIDs) > 0 { - resp, err := group.GetGroupsInfoCaller.Invoke(ctx, &group.GetGroupsInfoReq{GroupIDs: groupIDs}) + groups, err := x.groupClient.GetGroupsInfo(ctx, groupIDs) if err != nil { return err } - groupMap = datautil.SliceToMap(resp.GroupInfos, (*sdkws.GroupInfo).GetGroupID) + groupMap = datautil.SliceToMap(groups, (*sdkws.GroupInfo).GetGroupID) } for _, c := range conversations { if c.Conversation.GroupID == "" { @@ -91,21 +95,18 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive req.Count = defaultGetActiveConversation } req.OwnerUserID = mcontext.GetOpUserID(ctx) - conversationIDs, err := field(ctx, conversation.GetConversationIDsCaller.Invoke, - &conversation.GetConversationIDsReq{UserID: req.OwnerUserID}, (*conversation.GetConversationIDsResp).GetConversationIDs) + conversationIDs, err := x.conversationClient.GetConversationIDs(ctx, req.OwnerUserID) if err != nil { return nil, err } if len(conversationIDs) == 0 { return &jssdk.GetActiveConversationsResp{}, nil } - readSeq, err := field(ctx, msg.GetHasReadSeqsCaller.Invoke, - &msg.GetHasReadSeqsReq{UserID: req.OwnerUserID, ConversationIDs: conversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs) + readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID) if err != nil { return nil, err } - activeConversation, err := field(ctx, msg.GetActiveConversationCaller.Invoke, - &msg.GetActiveConversationReq{ConversationIDs: conversationIDs}, (*msg.GetActiveConversationResp).GetConversations) + activeConversation, err := x.msgClient.GetActiveConversation(ctx, conversationIDs) if err != nil { return nil, err } @@ -116,8 +117,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive Conversation: activeConversation, } if len(activeConversation) > 1 { - pinnedConversationIDs, err := field(ctx, conversation.GetPinnedConversationIDsCaller.Invoke, - &conversation.GetPinnedConversationIDsReq{UserID: req.OwnerUserID}, (*conversation.GetPinnedConversationIDsResp).GetConversationIDs) + pinnedConversationIDs, err := x.conversationClient.GetPinnedConversationIDs(ctx, req.OwnerUserID) if err != nil { return nil, err } @@ -125,25 +125,18 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive } sort.Sort(&sortConversations) sortList := sortConversations.Top(int(req.Count)) - conversations, err := field(ctx, conversation.GetConversationsCaller.Invoke, - &conversation.GetConversationsReq{ - OwnerUserID: req.OwnerUserID, - ConversationIDs: datautil.Slice(sortList, func(c *msg.ActiveConversation) string { - return c.ConversationID - })}, (*conversation.GetConversationsResp).GetConversations) + conversations, err := x.conversationClient.GetConversations(ctx, datautil.Slice(sortList, func(c *msg.ActiveConversation) string { + return c.ConversationID + }), req.OwnerUserID) if err != nil { return nil, err } - msgs, err := field(ctx, msg.GetSeqMessageCaller.Invoke, - &msg.GetSeqMessageReq{ - UserID: req.OwnerUserID, - Conversations: datautil.Slice(sortList, func(c *msg.ActiveConversation) *msg.ConversationSeqs { - return &msg.ConversationSeqs{ - ConversationID: c.ConversationID, - Seqs: []int64{c.MaxSeq}, - } - }), - }, (*msg.GetSeqMessageResp).GetMsgs) + msgs, err := x.msgClient.GetSeqMessage(ctx, req.OwnerUserID, datautil.Slice(sortList, func(c *msg.ActiveConversation) *msg.ConversationSeqs { + return &msg.ConversationSeqs{ + ConversationID: c.ConversationID, + Seqs: []int64{c.MaxSeq}, + } + })) if err != nil { return nil, err } @@ -185,7 +178,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversationsReq) (*jssdk.GetConversationsResp, error) { req.OwnerUserID = mcontext.GetOpUserID(ctx) - conversations, err := field(ctx, conversation.GetConversationsCaller.Invoke, &conversation.GetConversationsReq{OwnerUserID: req.OwnerUserID, ConversationIDs: req.ConversationIDs}, (*conversation.GetConversationsResp).GetConversations) + conversations, err := x.conversationClient.GetConversations(ctx, req.ConversationIDs, req.OwnerUserID) if err != nil { return nil, err } @@ -195,13 +188,11 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation req.ConversationIDs = datautil.Slice(conversations, func(c *conversation.Conversation) string { return c.ConversationID }) - maxSeqs, err := field(ctx, msg.GetMaxSeqsCaller.Invoke, - &msg.GetMaxSeqsReq{ConversationIDs: req.ConversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs) + maxSeqs, err := x.msgClient.GetMaxSeqs(ctx, req.ConversationIDs) if err != nil { return nil, err } - readSeqs, err := field(ctx, msg.GetHasReadSeqsCaller.Invoke, - &msg.GetHasReadSeqsReq{UserID: req.OwnerUserID, ConversationIDs: req.ConversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs) + readSeqs, err := x.msgClient.GetHasReadSeqs(ctx, req.ConversationIDs, req.OwnerUserID) if err != nil { return nil, err } @@ -216,8 +207,7 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation } var msgs map[string]*sdkws.PullMsgs if len(conversationSeqs) > 0 { - msgs, err = field(ctx, msg.GetSeqMessageCaller.Invoke, - &msg.GetSeqMessageReq{UserID: req.OwnerUserID, Conversations: conversationSeqs}, (*msg.GetSeqMessageResp).GetMsgs) + msgs, err = x.msgClient.GetSeqMessage(ctx, req.OwnerUserID, conversationSeqs) if err != nil { return nil, err } diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 156d32b4d..00cc79ff6 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -62,8 +62,9 @@ func Start(ctx context.Context, index int, conf *Config) error { ) hubServer := NewServer(longServer, conf, func(srv *Server) error { - longServer.online, _ = rpccache.NewOnlineCache(conf.Share.IMAdminUserID, nil, rdb, false, longServer.subscriberUserOnlineStatusChanges) - return nil + var err error + longServer.online, err = rpccache.NewOnlineCache(conf.Share.IMAdminUserID, nil, rdb, false, longServer.subscriberUserOnlineStatusChanges) + return err }) go longServer.ChangeOnlineStatus(4) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 19a53ebd5..5cb613123 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -25,7 +25,6 @@ import ( "strconv" "syscall" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/utils/jsonutil" @@ -93,10 +92,6 @@ func Start(ctx context.Context, index int, config *Config) error { } client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) - if err = rpcclient.InitRpcCaller(client, config.Discovery.RpcService); err != nil { - return err - } - msgModel := redis.NewMsgCache(rdb) msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) if err != nil { @@ -116,7 +111,7 @@ func Start(ctx context.Context, index int, config *Config) error { if err != nil { return err } - historyCH, err := NewOnlineHistoryRedisConsumerHandler(&config.KafkaConfig, msgTransferDatabase) + historyCH, err := NewOnlineHistoryRedisConsumerHandler(ctx, client, config, msgTransferDatabase) if err != nil { return err } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 9287d6b61..83b075061 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -18,6 +18,8 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "github.com/openimsdk/tools/discovery" "strconv" "strings" "sync" @@ -25,15 +27,12 @@ import ( "github.com/IBM/sarama" "github.com/go-redis/redis" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/tools/batcher" "github.com/openimsdk/protocol/constant" pbconv "github.com/openimsdk/protocol/conversation" - "github.com/openimsdk/protocol/group" - "github.com/openimsdk/protocol/rpccall" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" @@ -72,16 +71,30 @@ type OnlineHistoryRedisConsumerHandler struct { msgTransferDatabase controller.MsgTransferDatabase conversationUserHasReadChan chan *userHasReadSeq wg sync.WaitGroup + + groupClient *rpcli.GroupClient + conversationClient *rpcli.ConversationClient } -func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) { +func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.SvcDiscoveryRegistry, config *Config, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) { + kafkaConf := config.KafkaConfig historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}, false) if err != nil { return nil, err } + groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group) + if err != nil { + return nil, err + } + conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation) + if err != nil { + return nil, err + } var och OnlineHistoryRedisConsumerHandler och.msgTransferDatabase = database och.conversationUserHasReadChan = make(chan *userHasReadSeq, hasReadChanBuffer) + och.groupClient = rpcli.NewGroupClient(groupConn) + och.conversationClient = rpcli.NewConversationClient(conversationConn) och.wg.Add(1) b := batcher.New[sarama.ConsumerMessage]( @@ -109,15 +122,13 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID ctx = mcontext.WithTriggerIDContext(ctx, val.TriggerID()) ctxMessages := och.parseConsumerMessages(ctx, val.Val()) ctx = withAggregationCtx(ctx, ctxMessages) - log.ZInfo(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMessages), - "key", val.Key()) + log.ZInfo(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMessages), "key", val.Key()) och.doSetReadSeq(ctx, ctxMessages) storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList := och.categorizeMessageLists(ctxMessages) log.ZDebug(ctx, "number of categorized messages", "storageMsgList", len(storageMsgList), "notStorageMsgList", - len(notStorageMsgList), "storageNotificationList", len(storageNotificationList), "notStorageNotificationList", - len(notStorageNotificationList)) + len(notStorageMsgList), "storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList)) conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMessages[0].message) conversationIDNotification := msgprocessor.GetNotificationConversationIDByMsg(ctxMessages[0].message) @@ -282,31 +293,26 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key log.ZDebug(ctx, "group chat first create conversation", "conversationID", conversationID) - userIDs, err := rpccall.ExtractField(ctx, group.GetGroupMemberUserIDsCaller.Invoke, - &group.GetGroupMemberUserIDsReq{ - GroupID: msg.GroupID, - }, (*group.GetGroupMemberUserIDsResp).GetUserIDs) + userIDs, err := och.groupClient.GetGroupMemberUserIDs(ctx, msg.GroupID) if err != nil { log.ZWarn(ctx, "get group member ids error", err, "conversationID", conversationID) } else { log.ZInfo(ctx, "GetGroupMemberIDs end") - if err := pbconv.CreateGroupChatConversationsCaller.Execute(ctx, &pbconv.CreateGroupChatConversationsReq{ - UserIDs: userIDs, - GroupID: msg.GroupID, - }); err != nil { + if err := och.conversationClient.CreateGroupChatConversations(ctx, msg.GroupID, userIDs); err != nil { log.ZWarn(ctx, "single chat first create conversation error", err, "conversationID", conversationID) } } case constant.SingleChatType, constant.NotificationChatType: - if err := pbconv.CreateSingleChatConversationsCaller.Execute(ctx, &pbconv.CreateSingleChatConversationsReq{ + req := &pbconv.CreateSingleChatConversationsReq{ RecvID: msg.RecvID, SendID: msg.SendID, ConversationID: conversationID, ConversationType: msg.SessionType, - }); err != nil { + } + if err := och.conversationClient.CreateSingleChatConversations(ctx, req); err != nil { log.ZWarn(ctx, "single chat or notification first create conversation error", err, "conversationID", conversationID, "sessionType", msg.SessionType) } diff --git a/internal/push/push.go b/internal/push/push.go index 3247426e9..b7c1ec427 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -37,11 +37,6 @@ type Config struct { runTimeEnv string } -func (p pushServer) PushMsg(ctx context.Context, req *pbpush.PushMsgReq) (*pbpush.PushMsgResp, error) { - //todo reserved Interface - return nil, nil -} - func (p pushServer) DelUserPushToken(ctx context.Context, req *pbpush.DelUserPushTokenReq) (resp *pbpush.DelUserPushTokenResp, err error) { if err = p.database.DelFcmToken(ctx, req.UserID, int(req.PlatformID)); err != nil { @@ -65,7 +60,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg database := controller.NewPushDatabase(cacheModel, &config.KafkaConfig) - consumer, err := NewConsumerHandler(config, database, offlinePusher, rdb, client) + consumer, err := NewConsumerHandler(ctx, config, database, offlinePusher, rdb, client) if err != nil { return err } diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index ee855e122..d5d457c0d 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -3,6 +3,7 @@ package push import ( "context" "encoding/json" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "math/rand" "strconv" "time" @@ -17,12 +18,8 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/rpccache" "github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil" "github.com/openimsdk/protocol/constant" - pbconv "github.com/openimsdk/protocol/conversation" - "github.com/openimsdk/protocol/group" - "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/msggateway" pbpush "github.com/openimsdk/protocol/push" - "github.com/openimsdk/protocol/rpccall" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/log" @@ -45,9 +42,13 @@ type ConsumerHandler struct { conversationLocalCache *rpccache.ConversationLocalCache webhookClient *webhook.Client config *Config + userClient *rpcli.UserClient + groupClient *rpcli.GroupClient + msgClient *rpcli.MsgClient + conversationClient *rpcli.ConversationClient } -func NewConsumerHandler(config *Config, database controller.PushDatabase, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient, +func NewConsumerHandler(ctx context.Context, config *Config, database controller.PushDatabase, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient, client discovery.SvcDiscoveryRegistry) (*ConsumerHandler, error) { var consumerHandler ConsumerHandler var err error @@ -56,15 +57,35 @@ func NewConsumerHandler(config *Config, database controller.PushDatabase, offlin if err != nil { return nil, err } + userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) + if err != nil { + return nil, err + } + groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group) + if err != nil { + return nil, err + } + msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg) + if err != nil { + return nil, err + } + conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation) + if err != nil { + return nil, err + } + consumerHandler.userClient = rpcli.NewUserClient(userConn) + consumerHandler.groupClient = rpcli.NewGroupClient(groupConn) + consumerHandler.msgClient = rpcli.NewMsgClient(msgConn) + consumerHandler.conversationClient = rpcli.NewConversationClient(conversationConn) consumerHandler.offlinePusher = offlinePusher consumerHandler.onlinePusher = NewOnlinePusher(client, config) - consumerHandler.groupLocalCache = rpccache.NewGroupLocalCache(&config.LocalCacheConfig, rdb) - consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(&config.LocalCacheConfig, rdb) + consumerHandler.groupLocalCache = rpccache.NewGroupLocalCache(consumerHandler.groupClient, &config.LocalCacheConfig, rdb) + consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationClient, &config.LocalCacheConfig, rdb) consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL) consumerHandler.config = config consumerHandler.pushDatabase = database - consumerHandler.onlineCache, err = rpccache.NewOnlineCache(config.Share.IMAdminUserID, consumerHandler.groupLocalCache, rdb, config.RpcConfig.FullUserCache, nil) + consumerHandler.onlineCache, err = rpccache.NewOnlineCache(consumerHandler.userClient, consumerHandler.groupLocalCache, rdb, config.RpcConfig.FullUserCache, nil) if err != nil { return nil, err } @@ -321,7 +342,7 @@ func (c *ConsumerHandler) groupMessagesHandler(ctx context.Context, groupID stri ctx = mcontext.WithOpUserIDContext(ctx, c.config.Share.IMAdminUserID[0]) } defer func(groupID string) { - if err = group.DismissGroupCaller.Execute(ctx, &group.DismissGroupReq{GroupID: groupID}); err != nil { + if err := c.groupClient.DismissGroup(ctx, groupID, true); err != nil { log.ZError(ctx, "DismissGroup Notification clear members", err, "groupID", groupID) } }(groupID) @@ -347,12 +368,7 @@ func (c *ConsumerHandler) offlinePushMsg(ctx context.Context, msg *sdkws.MsgData func (c *ConsumerHandler) filterGroupMessageOfflinePush(ctx context.Context, groupID string, msg *sdkws.MsgData, offlinePushUserIDs []string) (userIDs []string, err error) { - - //todo local cache Obtain the difference set through local comparison. - needOfflinePushUserIDs, err := rpccall.ExtractField(ctx, pbconv.GetConversationOfflinePushUserIDsCaller.Invoke, &pbconv.GetConversationOfflinePushUserIDsReq{ - ConversationID: conversationutil.GenGroupConversationID(groupID), - UserIDs: offlinePushUserIDs, - }, (*pbconv.GetConversationOfflinePushUserIDsResp).GetUserIDs) + needOfflinePushUserIDs, err := c.conversationClient.GetConversationOfflinePushUserIDs(ctx, conversationutil.GenGroupConversationID(groupID), offlinePushUserIDs) if err != nil { return nil, err } @@ -406,18 +422,11 @@ func (c *ConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (title, conten func (c *ConsumerHandler) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error { conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID) - maxSeq, err := rpccall.ExtractField(ctx, msg.GetConversationMaxSeqCaller.Invoke, - &msg.GetConversationMaxSeqReq{ConversationID: conversationID}, - (*msg.GetConversationMaxSeqResp).GetMaxSeq) + maxSeq, err := c.msgClient.GetConversationMaxSeq(ctx, conversationID) if err != nil { return err } - - return pbconv.SetConversationMaxSeqCaller.Execute(ctx, &pbconv.SetConversationMaxSeqReq{ - ConversationID: conversationID, - OwnerUserID: userIDs, - MaxSeq: maxSeq, - }) + return c.conversationClient.SetConversationMaxSeq(ctx, conversationID, userIDs, maxSeq) } func unmarshalNotificationElem(bytes []byte, t any) error { @@ -425,6 +434,5 @@ func unmarshalNotificationElem(bytes []byte, t any) error { if err := json.Unmarshal(bytes, ¬ification); err != nil { return err } - return json.Unmarshal([]byte(notification.Detail), t) } diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index c220863c6..a2b14f40d 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -17,6 +17,7 @@ package auth import ( "context" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/common/config" redis2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" @@ -28,7 +29,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" pbauth "github.com/openimsdk/protocol/auth" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msggateway" @@ -44,6 +44,7 @@ type authServer struct { authDatabase controller.AuthDatabase RegisterCenter discovery.SvcDiscoveryRegistry config *Config + userClient *rpcli.UserClient } type Config struct { @@ -58,6 +59,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } + userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) + if err != nil { + return err + } pbauth.RegisterAuthServer(server, &authServer{ RegisterCenter: client, authDatabase: controller.NewAuthDatabase( @@ -67,7 +72,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg config.Share.MultiLogin, config.Share.IMAdminUserID, ), - config: config, + config: config, + userClient: rpcli.NewUserClient(userConn), }) return nil } @@ -83,7 +89,7 @@ func (s *authServer) GetAdminToken(ctx context.Context, req *pbauth.GetAdminToke } - if _, err := rpcclient.GetUserInfo(ctx, req.UserID); err != nil { + if err := s.userClient.CheckUser(ctx, []string{req.UserID}); err != nil { return nil, err } @@ -112,7 +118,7 @@ func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenR if authverify.IsManagerUserID(req.UserID, s.config.Share.IMAdminUserID) { return nil, errs.ErrNoPermission.WrapMsg("don't get Admin token") } - if _, err := rpcclient.GetUserInfo(ctx, req.UserID); err != nil { + if err := s.userClient.CheckUser(ctx, []string{req.UserID}); err != nil { return nil, err } token, err := s.authDatabase.CreateToken(ctx, req.UserID, int(req.PlatformID)) @@ -153,10 +159,7 @@ func (s *authServer) parseToken(ctx context.Context, tokensString string) (claim return nil, servererrs.ErrTokenNotExist.Wrap() } -func (s *authServer) ParseToken( - ctx context.Context, - req *pbauth.ParseTokenReq, -) (resp *pbauth.ParseTokenResp, err error) { +func (s *authServer) ParseToken(ctx context.Context, req *pbauth.ParseTokenReq) (resp *pbauth.ParseTokenResp, err error) { resp = &pbauth.ParseTokenResp{} claims, err := s.parseToken(ctx, req.Token) if err != nil { diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index 24d0a08e0..53364ff86 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -32,7 +32,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/constant" pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/sdkws" @@ -50,7 +49,8 @@ type conversationServer struct { conversationNotificationSender *ConversationNotificationSender config *Config - // todo + + userClient *rpcli.UserClient msgClient *rpcli.MsgClient groupClient *rpcli.GroupClient } @@ -78,11 +78,27 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } + userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) + if err != nil { + return err + } + groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group) + if err != nil { + return err + } + msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg) + if err != nil { + return err + } + msgClient := rpcli.NewMsgClient(msgConn) localcache.InitLocalCache(&config.LocalCacheConfig) pbconversation.RegisterConversationServer(server, &conversationServer{ - conversationNotificationSender: NewConversationNotificationSender(&config.NotificationConfig), + conversationNotificationSender: NewConversationNotificationSender(&config.NotificationConfig, msgClient), conversationDatabase: controller.NewConversationDatabase(conversationDB, redis.NewConversationRedis(rdb, &config.LocalCacheConfig, redis.GetRocksCacheOptions(), conversationDB), mgocli.GetTx()), + userClient: rpcli.NewUserClient(userConn), + groupClient: rpcli.NewGroupClient(groupConn), + msgClient: msgClient, }) return nil } @@ -560,7 +576,7 @@ func (c *conversationServer) getConversationInfo( } } if len(sendIDs) != 0 { - sendInfos, err := rpcclient.GetUsersInfo(ctx, sendIDs) + sendInfos, err := c.userClient.GetUsersInfo(ctx, sendIDs) if err != nil { return nil, err } diff --git a/internal/rpc/conversation/notification.go b/internal/rpc/conversation/notification.go index f94c0cd07..c07e623af 100644 --- a/internal/rpc/conversation/notification.go +++ b/internal/rpc/conversation/notification.go @@ -16,6 +16,8 @@ package conversation import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "github.com/openimsdk/protocol/msg" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" @@ -27,8 +29,10 @@ type ConversationNotificationSender struct { *rpcclient.NotificationSender } -func NewConversationNotificationSender(conf *config.Notification) *ConversationNotificationSender { - return &ConversationNotificationSender{rpcclient.NewNotificationSender(conf, rpcclient.WithRpcClient())} +func NewConversationNotificationSender(conf *config.Notification, msgClient *rpcli.MsgClient) *ConversationNotificationSender { + return &ConversationNotificationSender{rpcclient.NewNotificationSender(conf, rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { + return msgClient.SendMsg(ctx, req) + }))} } // SetPrivate invote. diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 131cb461b..c24149d43 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -36,9 +36,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/grouphash" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" "github.com/openimsdk/protocol/constant" pbconv "github.com/openimsdk/protocol/conversation" pbgroup "github.com/openimsdk/protocol/group" @@ -59,10 +57,11 @@ import ( type groupServer struct { pbgroup.UnimplementedGroupServer db controller.GroupDatabase - notification *GroupNotificationSender + notification *NotificationSender config *Config webhookClient *webhook.Client // todo + userClient *rpcli.UserClient msgClient *rpcli.MsgClient conversationClient *rpcli.ConversationClient } @@ -99,24 +98,33 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } - var gs groupServer - database := controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs)) - gs.db = database - gs.notification = NewGroupNotificationSender( - database, - config, - func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) { - users, err := rpcclient.GetUsersInfo(ctx, userIDs) - if err != nil { - return nil, err - } - return datautil.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil - }, - ) + //userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) + //msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) + //conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) + + userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) + if err != nil { + return err + } + msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg) + if err != nil { + return err + } + conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation) + if err != nil { + return err + } + gs := groupServer{ + config: config, + webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), + userClient: rpcli.NewUserClient(userConn), + msgClient: rpcli.NewMsgClient(msgConn), + conversationClient: rpcli.NewConversationClient(conversationConn), + } + gs.db = controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs)) + gs.notification = NewNotificationSender(gs.db, config, gs.userClient, gs.msgClient, gs.conversationClient) localcache.InitLocalCache(&config.LocalCacheConfig) - gs.config = config - gs.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL) pbgroup.RegisterGroupServer(server, &gs) return nil } @@ -160,19 +168,6 @@ func (g *groupServer) CheckGroupAdmin(ctx context.Context, groupID string) error return nil } -func (g *groupServer) GetPublicUserInfoMap(ctx context.Context, userIDs []string) (map[string]*sdkws.PublicUserInfo, error) { - if len(userIDs) == 0 { - return map[string]*sdkws.PublicUserInfo{}, nil - } - users, err := rpcclient.GetPublicUserInfos(ctx, userIDs) - if err != nil { - return nil, err - } - return datautil.SliceToMapAny(users, func(e *sdkws.PublicUserInfo) (string, *sdkws.PublicUserInfo) { - return e.UserID, e - }), nil -} - func (g *groupServer) IsNotFound(err error) bool { return errs.ErrRecordNotFound.Is(specialerror.ErrCode(errs.Unwrap(err))) } @@ -214,7 +209,6 @@ func (g *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR return nil, errs.ErrArgs.WrapMsg("no group owner") } if err := authverify.CheckAccessV3(ctx, req.OwnerUserID, g.config.Share.IMAdminUserID); err != nil { - return nil, err } userIDs := append(append(req.MemberUserIDs, req.AdminUserIDs...), req.OwnerUserID) @@ -227,7 +221,7 @@ func (g *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR return nil, errs.ErrArgs.WrapMsg("group member repeated") } - userMap, err := rpcclient.GetUsersInfoMap(ctx, userIDs) + userMap, err := g.userClient.GetUsersInfoMap(ctx, userIDs) if err != nil { return nil, err } @@ -378,7 +372,7 @@ func (g *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite return nil, servererrs.ErrDismissedAlready.WrapMsg("group dismissed checking group status found it dismissed") } - userMap, err := rpcclient.GetUsersInfoMap(ctx, req.InvitedUserIDs) + userMap, err := g.userClient.GetUsersInfoMap(ctx, req.InvitedUserIDs) if err != nil { return nil, err } @@ -689,7 +683,7 @@ func (g *groupServer) GetGroupApplicationList(ctx context.Context, req *pbgroup. userIDs = append(userIDs, gr.UserID) } userIDs = datautil.Distinct(userIDs) - userMap, err := rpcclient.GetPublicUserInfoMap(ctx, userIDs) + userMap, err := g.userClient.GetUsersInfoMap(ctx, userIDs) if err != nil { return nil, err } @@ -801,7 +795,7 @@ func (g *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup } else if !g.IsNotFound(err) { return nil, err } - if _, err := rpcclient.GetPublicUserInfo(ctx, req.FromUserID); err != nil { + if err := g.userClient.CheckUser(ctx, []string{req.FromUserID}); err != nil { return nil, err } var member *model.GroupMember @@ -845,7 +839,7 @@ func (g *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup } func (g *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq) (*pbgroup.JoinGroupResp, error) { - user, err := rpcclient.GetUserInfo(ctx, req.InviterUserID) + user, err := g.userClient.GetUserInfo(ctx, req.InviterUserID) if err != nil { return nil, err } @@ -1297,7 +1291,7 @@ func (g *groupServer) GetGroupMembersCMS(ctx context.Context, req *pbgroup.GetGr } func (g *groupServer) GetUserReqApplicationList(ctx context.Context, req *pbgroup.GetUserReqApplicationListReq) (*pbgroup.GetUserReqApplicationListResp, error) { - user, err := rpcclient.GetPublicUserInfo(ctx, req.UserID) + user, err := g.userClient.GetUserInfo(ctx, req.UserID) if err != nil { return nil, err } @@ -1753,7 +1747,7 @@ func (g *groupServer) GetGroupUsersReqApplicationList(ctx context.Context, req * return nil, servererrs.ErrGroupIDNotFound.WrapMsg(strings.Join(ids, ",")) } - userMap, err := rpcclient.GetPublicUserInfoMap(ctx, req.UserIDs) + userMap, err := g.userClient.GetUsersInfoMap(ctx, req.UserIDs) if err != nil { return nil, err } @@ -1784,7 +1778,7 @@ func (g *groupServer) GetGroupUsersReqApplicationList(ctx context.Context, req * ownerUserID = owner.UserID } - var userInfo *sdkws.PublicUserInfo + var userInfo *sdkws.UserInfo if user, ok := userMap[e.UserID]; !ok { userInfo = user } @@ -1830,7 +1824,7 @@ func (g *groupServer) GetSpecifiedUserGroupRequestInfo(ctx context.Context, req return nil, err } - userInfos, err := rpcclient.GetPublicUserInfos(ctx, []string{req.UserID}) + userInfos, err := g.userClient.GetUsersInfo(ctx, []string{req.UserID}) if err != nil { return nil, err } diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index 4a9be4ec0..80b24815d 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "time" "github.com/openimsdk/open-im-server/v3/pkg/authverify" @@ -31,10 +32,8 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" "github.com/openimsdk/protocol/constant" - pbconv "github.com/openimsdk/protocol/conversation" pbgroup "github.com/openimsdk/protocol/group" "github.com/openimsdk/protocol/msg" - "github.com/openimsdk/protocol/rpccall" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" @@ -50,27 +49,38 @@ const ( adminReceiver ) -func NewGroupNotificationSender( - db controller.GroupDatabase, - config *Config, - fn func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error), -) *GroupNotificationSender { - return &GroupNotificationSender{ - NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(), rpcclient.WithUserRpcClient()), - getUsersInfo: fn, +func NewNotificationSender(db controller.GroupDatabase, config *Config, userClient *rpcli.UserClient, msgClient *rpcli.MsgClient, conversationClient *rpcli.ConversationClient) *NotificationSender { + return &NotificationSender{ + NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, + rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { + return msgClient.SendMsg(ctx, req) + }), + rpcclient.WithUserRpcClient(userClient.GetUserInfo), + ), + getUsersInfo: func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) { + users, err := userClient.GetUsersInfo(ctx, userIDs) + if err != nil { + return nil, err + } + return datautil.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil + }, db: db, config: config, + msgClient: msgClient, + conversationClient: conversationClient, } } -type GroupNotificationSender struct { +type NotificationSender struct { *rpcclient.NotificationSender - getUsersInfo func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) - db controller.GroupDatabase - config *Config + getUsersInfo func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) + db controller.GroupDatabase + config *Config + msgClient *rpcli.MsgClient + conversationClient *rpcli.ConversationClient } -func (g *GroupNotificationSender) PopulateGroupMember(ctx context.Context, members ...*model.GroupMember) error { +func (g *NotificationSender) PopulateGroupMember(ctx context.Context, members ...*model.GroupMember) error { if len(members) == 0 { return nil } @@ -105,7 +115,7 @@ func (g *GroupNotificationSender) PopulateGroupMember(ctx context.Context, membe return nil } -func (g *GroupNotificationSender) getUser(ctx context.Context, userID string) (*sdkws.PublicUserInfo, error) { +func (g *NotificationSender) getUser(ctx context.Context, userID string) (*sdkws.PublicUserInfo, error) { users, err := g.getUsersInfo(ctx, []string{userID}) if err != nil { return nil, err @@ -121,7 +131,7 @@ func (g *GroupNotificationSender) getUser(ctx context.Context, userID string) (* }, nil } -func (g *GroupNotificationSender) getGroupInfo(ctx context.Context, groupID string) (*sdkws.GroupInfo, error) { +func (g *NotificationSender) getGroupInfo(ctx context.Context, groupID string) (*sdkws.GroupInfo, error) { gm, err := g.db.TakeGroup(ctx, groupID) if err != nil { return nil, err @@ -142,7 +152,7 @@ func (g *GroupNotificationSender) getGroupInfo(ctx context.Context, groupID stri return convert.Db2PbGroupInfo(gm, ownerUserID, num), nil } -func (g *GroupNotificationSender) getGroupMembers(ctx context.Context, groupID string, userIDs []string) ([]*sdkws.GroupMemberFullInfo, error) { +func (g *NotificationSender) getGroupMembers(ctx context.Context, groupID string, userIDs []string) ([]*sdkws.GroupMemberFullInfo, error) { members, err := g.db.FindGroupMembers(ctx, groupID, userIDs) if err != nil { return nil, err @@ -158,7 +168,7 @@ func (g *GroupNotificationSender) getGroupMembers(ctx context.Context, groupID s return res, nil } -func (g *GroupNotificationSender) getGroupMemberMap(ctx context.Context, groupID string, userIDs []string) (map[string]*sdkws.GroupMemberFullInfo, error) { +func (g *NotificationSender) getGroupMemberMap(ctx context.Context, groupID string, userIDs []string) (map[string]*sdkws.GroupMemberFullInfo, error) { members, err := g.getGroupMembers(ctx, groupID, userIDs) if err != nil { return nil, err @@ -170,7 +180,7 @@ func (g *GroupNotificationSender) getGroupMemberMap(ctx context.Context, groupID return m, nil } -func (g *GroupNotificationSender) getGroupMember(ctx context.Context, groupID string, userID string) (*sdkws.GroupMemberFullInfo, error) { +func (g *NotificationSender) getGroupMember(ctx context.Context, groupID string, userID string) (*sdkws.GroupMemberFullInfo, error) { members, err := g.getGroupMembers(ctx, groupID, []string{userID}) if err != nil { return nil, err @@ -181,7 +191,7 @@ func (g *GroupNotificationSender) getGroupMember(ctx context.Context, groupID st return members[0], nil } -func (g *GroupNotificationSender) getGroupOwnerAndAdminUserID(ctx context.Context, groupID string) ([]string, error) { +func (g *NotificationSender) getGroupOwnerAndAdminUserID(ctx context.Context, groupID string) ([]string, error) { members, err := g.db.FindGroupMemberRoleLevels(ctx, groupID, []int32{constant.GroupOwner, constant.GroupAdmin}) if err != nil { return nil, err @@ -193,7 +203,7 @@ func (g *GroupNotificationSender) getGroupOwnerAndAdminUserID(ctx context.Contex return datautil.Slice(members, fn), nil } -func (g *GroupNotificationSender) groupMemberDB2PB(member *model.GroupMember, appMangerLevel int32) *sdkws.GroupMemberFullInfo { +func (g *NotificationSender) groupMemberDB2PB(member *model.GroupMember, appMangerLevel int32) *sdkws.GroupMemberFullInfo { return &sdkws.GroupMemberFullInfo{ GroupID: member.GroupID, UserID: member.UserID, @@ -210,7 +220,7 @@ func (g *GroupNotificationSender) groupMemberDB2PB(member *model.GroupMember, ap } } -/* func (g *GroupNotificationSender) getUsersInfoMap(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error) { +/* func (g *NotificationSender) getUsersInfoMap(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error) { users, err := g.getUsersInfo(ctx, userIDs) if err != nil { return nil, err @@ -222,11 +232,11 @@ func (g *GroupNotificationSender) groupMemberDB2PB(member *model.GroupMember, ap return result, nil } */ -func (g *GroupNotificationSender) fillOpUser(ctx context.Context, opUser **sdkws.GroupMemberFullInfo, groupID string) (err error) { +func (g *NotificationSender) fillOpUser(ctx context.Context, opUser **sdkws.GroupMemberFullInfo, groupID string) (err error) { return g.fillOpUserByUserID(ctx, mcontext.GetOpUserID(ctx), opUser, groupID) } -func (g *GroupNotificationSender) fillOpUserByUserID(ctx context.Context, userID string, opUser **sdkws.GroupMemberFullInfo, groupID string) error { +func (g *NotificationSender) fillOpUserByUserID(ctx context.Context, userID string, opUser **sdkws.GroupMemberFullInfo, groupID string) error { if opUser == nil { return errs.ErrInternalServer.WrapMsg("**sdkws.GroupMemberFullInfo is nil") } @@ -270,7 +280,7 @@ func (g *GroupNotificationSender) fillOpUserByUserID(ctx context.Context, userID return nil } -func (g *GroupNotificationSender) setVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string) { +func (g *NotificationSender) setVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string) { versions := versionctx.GetVersionLog(ctx).Get() for _, coll := range versions { if coll.Name == collName && coll.Doc.DID == id { @@ -281,7 +291,7 @@ func (g *GroupNotificationSender) setVersion(ctx context.Context, version *uint6 } } -func (g *GroupNotificationSender) setSortVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string, sortVersion *uint64) { +func (g *NotificationSender) setSortVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string, sortVersion *uint64) { versions := versionctx.GetVersionLog(ctx).Get() for _, coll := range versions { if coll.Name == collName && coll.Doc.DID == id { @@ -296,7 +306,7 @@ func (g *GroupNotificationSender) setSortVersion(ctx context.Context, version *u } } -func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips) { +func (g *NotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips) { var err error defer func() { if err != nil { @@ -310,7 +320,7 @@ func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupCreatedNotification, tips) } -func (g *GroupNotificationSender) GroupInfoSetNotification(ctx context.Context, tips *sdkws.GroupInfoSetTips) { +func (g *NotificationSender) GroupInfoSetNotification(ctx context.Context, tips *sdkws.GroupInfoSetTips) { var err error defer func() { if err != nil { @@ -324,7 +334,7 @@ func (g *GroupNotificationSender) GroupInfoSetNotification(ctx context.Context, g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNotification, tips, rpcclient.WithRpcGetUserName()) } -func (g *GroupNotificationSender) GroupInfoSetNameNotification(ctx context.Context, tips *sdkws.GroupInfoSetNameTips) { +func (g *NotificationSender) GroupInfoSetNameNotification(ctx context.Context, tips *sdkws.GroupInfoSetNameTips) { var err error defer func() { if err != nil { @@ -338,7 +348,7 @@ func (g *GroupNotificationSender) GroupInfoSetNameNotification(ctx context.Conte g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNameNotification, tips) } -func (g *GroupNotificationSender) GroupInfoSetAnnouncementNotification(ctx context.Context, tips *sdkws.GroupInfoSetAnnouncementTips) { +func (g *NotificationSender) GroupInfoSetAnnouncementNotification(ctx context.Context, tips *sdkws.GroupInfoSetAnnouncementTips) { var err error defer func() { if err != nil { @@ -352,7 +362,7 @@ func (g *GroupNotificationSender) GroupInfoSetAnnouncementNotification(ctx conte g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetAnnouncementNotification, tips, rpcclient.WithRpcGetUserName()) } -func (g *GroupNotificationSender) JoinGroupApplicationNotification(ctx context.Context, req *pbgroup.JoinGroupReq) { +func (g *NotificationSender) JoinGroupApplicationNotification(ctx context.Context, req *pbgroup.JoinGroupReq) { var err error defer func() { if err != nil { @@ -380,7 +390,7 @@ func (g *GroupNotificationSender) JoinGroupApplicationNotification(ctx context.C } } -func (g *GroupNotificationSender) MemberQuitNotification(ctx context.Context, member *sdkws.GroupMemberFullInfo) { +func (g *NotificationSender) MemberQuitNotification(ctx context.Context, member *sdkws.GroupMemberFullInfo) { var err error defer func() { if err != nil { @@ -397,7 +407,7 @@ func (g *GroupNotificationSender) MemberQuitNotification(ctx context.Context, me g.Notification(ctx, mcontext.GetOpUserID(ctx), member.GroupID, constant.MemberQuitNotification, tips) } -func (g *GroupNotificationSender) GroupApplicationAcceptedNotification(ctx context.Context, req *pbgroup.GroupApplicationResponseReq) { +func (g *NotificationSender) GroupApplicationAcceptedNotification(ctx context.Context, req *pbgroup.GroupApplicationResponseReq) { var err error defer func() { if err != nil { @@ -430,7 +440,7 @@ func (g *GroupNotificationSender) GroupApplicationAcceptedNotification(ctx conte } } -func (g *GroupNotificationSender) GroupApplicationRejectedNotification(ctx context.Context, req *pbgroup.GroupApplicationResponseReq) { +func (g *NotificationSender) GroupApplicationRejectedNotification(ctx context.Context, req *pbgroup.GroupApplicationResponseReq) { var err error defer func() { if err != nil { @@ -463,7 +473,7 @@ func (g *GroupNotificationSender) GroupApplicationRejectedNotification(ctx conte } } -func (g *GroupNotificationSender) GroupOwnerTransferredNotification(ctx context.Context, req *pbgroup.TransferGroupOwnerReq) { +func (g *NotificationSender) GroupOwnerTransferredNotification(ctx context.Context, req *pbgroup.TransferGroupOwnerReq) { var err error defer func() { if err != nil { @@ -494,7 +504,7 @@ func (g *GroupNotificationSender) GroupOwnerTransferredNotification(ctx context. g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupOwnerTransferredNotification, tips) } -func (g *GroupNotificationSender) MemberKickedNotification(ctx context.Context, tips *sdkws.MemberKickedTips) { +func (g *NotificationSender) MemberKickedNotification(ctx context.Context, tips *sdkws.MemberKickedTips) { var err error defer func() { if err != nil { @@ -508,7 +518,7 @@ func (g *GroupNotificationSender) MemberKickedNotification(ctx context.Context, g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.MemberKickedNotification, tips) } -func (g *GroupNotificationSender) GroupApplicationAgreeMemberEnterNotification(ctx context.Context, groupID string, invitedOpUserID string, entrantUserID ...string) error { +func (g *NotificationSender) GroupApplicationAgreeMemberEnterNotification(ctx context.Context, groupID string, invitedOpUserID string, entrantUserID ...string) error { var err error defer func() { if err != nil { @@ -518,30 +528,15 @@ func (g *GroupNotificationSender) GroupApplicationAgreeMemberEnterNotification(c if !g.config.RpcConfig.EnableHistoryForNewMembers { conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID) - - maxSeq, err := rpccall.ExtractField(ctx, msg.GetConversationMaxSeqCaller.Invoke, - &msg.GetConversationMaxSeqReq{ConversationID: conversationID}, - (*msg.GetConversationMaxSeqResp).GetMaxSeq) - maxSeq,err := g. - - + maxSeq, err := g.msgClient.GetConversationMaxSeq(ctx, conversationID) if err != nil { return err } - - if err := msg.SetUserConversationsMinSeqCaller.Execute(ctx, &msg.SetUserConversationsMinSeqReq{ - UserIDs: entrantUserID, - ConversationID: conversationID, - Seq: maxSeq, - }); err != nil { + if err := g.msgClient.SetUserConversationsMinSeq(ctx, conversationID, entrantUserID, maxSeq); err != nil { return err } } - - if err := pbconv.CreateGroupChatConversationsCaller.Execute(ctx, &pbconv.CreateGroupChatConversationsReq{ - UserIDs: entrantUserID, - GroupID: groupID, - }); err != nil { + if err := g.conversationClient.CreateGroupChatConversations(ctx, groupID, entrantUserID); err != nil { return err } @@ -577,7 +572,7 @@ func (g *GroupNotificationSender) GroupApplicationAgreeMemberEnterNotification(c return nil } -func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID string) error { +func (g *NotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID string) error { var err error defer func() { if err != nil { @@ -587,28 +582,17 @@ func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, g if !g.config.RpcConfig.EnableHistoryForNewMembers { conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID) - maxSeq, err := rpccall.ExtractField(ctx, msg.GetConversationMaxSeqCaller.Invoke, - &msg.GetConversationMaxSeqReq{ConversationID: conversationID}, - (*msg.GetConversationMaxSeqResp).GetMaxSeq) + maxSeq, err := g.msgClient.GetConversationMaxSeq(ctx, conversationID) if err != nil { return err } - if err := msg.SetUserConversationsMinSeqCaller.Execute(ctx, &msg.SetUserConversationsMinSeqReq{ - UserIDs: []string{entrantUserID}, - ConversationID: conversationID, - Seq: maxSeq, - }); err != nil { + if err := g.msgClient.SetUserConversationsMinSeq(ctx, conversationID, []string{entrantUserID}, maxSeq); err != nil { return err } } - - if err := pbconv.CreateGroupChatConversationsCaller.Execute(ctx, &pbconv.CreateGroupChatConversationsReq{ - UserIDs: []string{entrantUserID}, - GroupID: groupID, - }); err != nil { + if err := g.conversationClient.CreateGroupChatConversations(ctx, groupID, []string{entrantUserID}); err != nil { return err } - var group *sdkws.GroupInfo group, err = g.getGroupInfo(ctx, groupID) if err != nil { @@ -629,7 +613,7 @@ func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, g return nil } -func (g *GroupNotificationSender) GroupDismissedNotification(ctx context.Context, tips *sdkws.GroupDismissedTips) { +func (g *NotificationSender) GroupDismissedNotification(ctx context.Context, tips *sdkws.GroupDismissedTips) { var err error defer func() { if err != nil { @@ -642,7 +626,7 @@ func (g *GroupNotificationSender) GroupDismissedNotification(ctx context.Context g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupDismissedNotification, tips) } -func (g *GroupNotificationSender) GroupMemberMutedNotification(ctx context.Context, groupID, groupMemberUserID string, mutedSeconds uint32) { +func (g *NotificationSender) GroupMemberMutedNotification(ctx context.Context, groupID, groupMemberUserID string, mutedSeconds uint32) { var err error defer func() { if err != nil { @@ -670,7 +654,7 @@ func (g *GroupNotificationSender) GroupMemberMutedNotification(ctx context.Conte g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberMutedNotification, tips) } -func (g *GroupNotificationSender) GroupMemberCancelMutedNotification(ctx context.Context, groupID, groupMemberUserID string) { +func (g *NotificationSender) GroupMemberCancelMutedNotification(ctx context.Context, groupID, groupMemberUserID string) { var err error defer func() { if err != nil { @@ -695,7 +679,7 @@ func (g *GroupNotificationSender) GroupMemberCancelMutedNotification(ctx context g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberCancelMutedNotification, tips) } -func (g *GroupNotificationSender) GroupMutedNotification(ctx context.Context, groupID string) { +func (g *NotificationSender) GroupMutedNotification(ctx context.Context, groupID string) { var err error defer func() { if err != nil { @@ -723,7 +707,7 @@ func (g *GroupNotificationSender) GroupMutedNotification(ctx context.Context, gr g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMutedNotification, tips) } -func (g *GroupNotificationSender) GroupCancelMutedNotification(ctx context.Context, groupID string) { +func (g *NotificationSender) GroupCancelMutedNotification(ctx context.Context, groupID string) { var err error defer func() { if err != nil { @@ -751,7 +735,7 @@ func (g *GroupNotificationSender) GroupCancelMutedNotification(ctx context.Conte g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupCancelMutedNotification, tips) } -func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Context, groupID, groupMemberUserID string) { +func (g *NotificationSender) GroupMemberInfoSetNotification(ctx context.Context, groupID, groupMemberUserID string) { var err error defer func() { if err != nil { @@ -776,7 +760,7 @@ func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Con g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberInfoSetNotification, tips) } -func (g *GroupNotificationSender) GroupMemberSetToAdminNotification(ctx context.Context, groupID, groupMemberUserID string) { +func (g *NotificationSender) GroupMemberSetToAdminNotification(ctx context.Context, groupID, groupMemberUserID string) { var err error defer func() { if err != nil { @@ -800,7 +784,7 @@ func (g *GroupNotificationSender) GroupMemberSetToAdminNotification(ctx context. g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToAdminNotification, tips) } -func (g *GroupNotificationSender) GroupMemberSetToOrdinaryUserNotification(ctx context.Context, groupID, groupMemberUserID string) { +func (g *NotificationSender) GroupMemberSetToOrdinaryUserNotification(ctx context.Context, groupID, groupMemberUserID string) { var err error defer func() { if err != nil { diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 29219864a..6c4747ed1 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -55,6 +55,7 @@ type Config struct { // MsgServer encapsulates dependencies required for message handling. type msgServer struct { + msg.UnimplementedMsgServer RegisterCenter discovery.SvcDiscoveryRegistry // Service discovery registry for service registration. MsgDatabase controller.CommonMsgDatabase // Interface for message database operations. StreamMsgDatabase controller.StreamMsgDatabase @@ -67,9 +68,7 @@ type msgServer struct { msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications. config *Config // Global configuration settings. webhookClient *webhook.Client - msg.UnimplementedMsgServer - // todo - conversationClient rpcli.ConversationClient + conversationClient *rpcli.ConversationClient } func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) { @@ -109,16 +108,34 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } + userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) + if err != nil { + return err + } + groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group) + if err != nil { + return err + } + friendConn, err := client.GetConn(ctx, config.Discovery.RpcService.Friend) + if err != nil { + return err + } + conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation) + if err != nil { + return err + } + conversationClient := rpcli.NewConversationClient(conversationConn) s := &msgServer{ MsgDatabase: msgDatabase, StreamMsgDatabase: controller.NewStreamMsgDatabase(streamMsg), RegisterCenter: client, - UserLocalCache: rpccache.NewUserLocalCache(&config.LocalCacheConfig, rdb), - GroupLocalCache: rpccache.NewGroupLocalCache(&config.LocalCacheConfig, rdb), - ConversationLocalCache: rpccache.NewConversationLocalCache(&config.LocalCacheConfig, rdb), - FriendLocalCache: rpccache.NewFriendLocalCache(&config.LocalCacheConfig, rdb), + UserLocalCache: rpccache.NewUserLocalCache(rpcli.NewUserClient(userConn), &config.LocalCacheConfig, rdb), + GroupLocalCache: rpccache.NewGroupLocalCache(rpcli.NewGroupClient(groupConn), &config.LocalCacheConfig, rdb), + ConversationLocalCache: rpccache.NewConversationLocalCache(conversationClient, &config.LocalCacheConfig, rdb), + FriendLocalCache: rpccache.NewFriendLocalCache(rpcli.NewRelationClient(friendConn), &config.LocalCacheConfig, rdb), config: config, webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), + conversationClient: conversationClient, } s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg)) diff --git a/internal/rpc/relation/black.go b/internal/rpc/relation/black.go index 0fd9b8766..2108d7dc5 100644 --- a/internal/rpc/relation/black.go +++ b/internal/rpc/relation/black.go @@ -21,7 +21,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/relation" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" @@ -39,7 +38,7 @@ func (s *friendServer) GetPaginationBlacks(ctx context.Context, req *relation.Ge return nil, err } resp = &relation.GetPaginationBlacksResp{} - resp.Blacks, err = convert.BlackDB2Pb(ctx, blacks, rpcclient.GetUsersInfoMap) + resp.Blacks, err = convert.BlackDB2Pb(ctx, blacks, s.userClient.GetUsersInfoMap) if err != nil { return nil, err } @@ -81,9 +80,7 @@ func (s *friendServer) AddBlack(ctx context.Context, req *relation.AddBlackReq) if err := s.webhookBeforeAddBlack(ctx, &s.config.WebhooksConfig.BeforeAddBlack, req); err != nil { return nil, err } - - _, err := rpcclient.GetUsersInfo(ctx, []string{req.OwnerUserID, req.BlackUserID}) - if err != nil { + if err := s.userClient.CheckUser(ctx, []string{req.OwnerUserID, req.BlackUserID}); err != nil { return nil, err } black := model.Black{ @@ -114,7 +111,7 @@ func (s *friendServer) GetSpecifiedBlacks(ctx context.Context, req *relation.Get return nil, errs.ErrArgs.WrapMsg("userIDList repeated") } - userMap, err := rpcclient.GetPublicUserInfoMap(ctx, req.UserIDList) + userMap, err := s.userClient.GetUsersInfoMap(ctx, req.UserIDList) if err != nil { return nil, err } @@ -132,13 +129,26 @@ func (s *friendServer) GetSpecifiedBlacks(ctx context.Context, req *relation.Get Blacks: make([]*sdkws.BlackInfo, 0, len(req.UserIDList)), } + toPublcUser := func(userID string) *sdkws.PublicUserInfo { + v, ok := userMap[userID] + if !ok { + return nil + } + return &sdkws.PublicUserInfo{ + UserID: v.UserID, + Nickname: v.Nickname, + FaceURL: v.FaceURL, + Ex: v.Ex, + } + } + for _, userID := range req.UserIDList { if black := blackMap[userID]; black != nil { resp.Blacks = append(resp.Blacks, &sdkws.BlackInfo{ OwnerUserID: black.OwnerUserID, CreateTime: black.CreateTime.UnixMilli(), - BlackUserInfo: userMap[userID], + BlackUserInfo: toPublcUser(userID), AddSource: black.AddSource, OperatorUserID: black.OperatorUserID, Ex: black.Ex, diff --git a/internal/rpc/relation/friend.go b/internal/rpc/relation/friend.go index b97a9d01f..ed286d02d 100644 --- a/internal/rpc/relation/friend.go +++ b/internal/rpc/relation/friend.go @@ -16,6 +16,7 @@ package relation import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/tools/mq/memamq" @@ -31,7 +32,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/relation" "github.com/openimsdk/protocol/sdkws" @@ -51,6 +51,7 @@ type friendServer struct { config *Config webhookClient *webhook.Client queue *memamq.MemoryQueue + userClient *rpcli.UserClient } type Config struct { @@ -90,10 +91,21 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg return err } + userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) + if err != nil { + return err + } + msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg) + if err != nil { + return err + } + userClient := rpcli.NewUserClient(userConn) + // Initialize notification sender notificationSender := NewFriendNotificationSender( &config.NotificationConfig, - WithRpcFunc(rpcclient.GetUsersInfo), + rpcli.NewMsgClient(msgConn), + WithRpcFunc(userClient.GetUsersInfo), ) localcache.InitLocalCache(&config.LocalCacheConfig) @@ -130,7 +142,7 @@ func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *relation.Apply if err = s.webhookBeforeAddFriend(ctx, &s.config.WebhooksConfig.BeforeAddFriend, req); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } - if _, err := rpcclient.GetUsersInfoMap(ctx, []string{req.ToUserID, req.FromUserID}); err != nil { + if err := s.userClient.CheckUser(ctx, []string{req.ToUserID, req.FromUserID}); err != nil { return nil, err } @@ -155,7 +167,7 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *relation.ImportFr return nil, err } - if _, err := rpcclient.GetUsersInfo(ctx, append([]string{req.OwnerUserID}, req.FriendUserIDs...)); err != nil { + if err := s.userClient.CheckUser(ctx, append([]string{req.OwnerUserID}, req.FriendUserIDs...)); err != nil { return nil, err } if datautil.Contain(req.OwnerUserID, req.FriendUserIDs...) { @@ -296,7 +308,7 @@ func (s *friendServer) getFriend(ctx context.Context, ownerUserID string, friend if err != nil { return nil, err } - return convert.FriendsDB2Pb(ctx, friends, rpcclient.GetUsersInfoMap) + return convert.FriendsDB2Pb(ctx, friends, s.userClient.GetUsersInfoMap) } // Get the list of friend requests sent out proactively. @@ -308,7 +320,7 @@ func (s *friendServer) GetDesignatedFriendsApply(ctx context.Context, return nil, err } resp = &relation.GetDesignatedFriendsApplyResp{} - resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, rpcclient.GetUsersInfoMap) + resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.userClient.GetUsersInfoMap) if err != nil { return nil, err } @@ -327,7 +339,7 @@ func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *rel } resp = &relation.GetPaginationFriendsApplyToResp{} - resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, rpcclient.GetUsersInfoMap) + resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.userClient.GetUsersInfoMap) if err != nil { return nil, err } @@ -349,7 +361,7 @@ func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *r return nil, err } - resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, rpcclient.GetUsersInfoMap) + resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.userClient.GetUsersInfoMap) if err != nil { return nil, err } @@ -380,7 +392,7 @@ func (s *friendServer) GetPaginationFriends(ctx context.Context, req *relation.G } resp = &relation.GetPaginationFriendsResp{} - resp.FriendsInfo, err = convert.FriendsDB2Pb(ctx, friends, rpcclient.GetUsersInfoMap) + resp.FriendsInfo, err = convert.FriendsDB2Pb(ctx, friends, s.userClient.GetUsersInfoMap) if err != nil { return nil, err } @@ -413,7 +425,7 @@ func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *relatio return nil, errs.ErrArgs.WrapMsg("userIDList repeated") } - userMap, err := rpcclient.GetUsersInfoMap(ctx, req.UserIDList) + userMap, err := s.userClient.GetUsersInfoMap(ctx, req.UserIDList) if err != nil { return nil, err } diff --git a/internal/rpc/relation/notification.go b/internal/rpc/relation/notification.go index ba4365200..6763a8986 100644 --- a/internal/rpc/relation/notification.go +++ b/internal/rpc/relation/notification.go @@ -16,6 +16,8 @@ package relation import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "github.com/openimsdk/protocol/msg" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx" @@ -85,12 +87,11 @@ func WithRpcFunc( } } -func NewFriendNotificationSender( - conf *config.Notification, - opts ...friendNotificationSenderOptions, -) *FriendNotificationSender { +func NewFriendNotificationSender(conf *config.Notification, msgClient *rpcli.MsgClient, opts ...friendNotificationSenderOptions) *FriendNotificationSender { f := &FriendNotificationSender{ - NotificationSender: rpcclient.NewNotificationSender(conf, rpcclient.WithRpcClient()), + NotificationSender: rpcclient.NewNotificationSender(conf, rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { + return msgClient.SendMsg(ctx, req) + })), } for _, opt := range opts { opt(f) diff --git a/internal/rpc/third/log.go b/internal/rpc/third/log.go index 222cfad1d..4d8cbc0bb 100644 --- a/internal/rpc/third/log.go +++ b/internal/rpc/third/log.go @@ -19,11 +19,9 @@ import ( "crypto/rand" "time" - relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" - "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" + relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/third" "github.com/openimsdk/tools/errs" @@ -150,7 +148,7 @@ func (t *thirdServer) SearchLogs(ctx context.Context, req *third.SearchLogsReq) for _, log := range logs { userIDs = append(userIDs, log.UserID) } - userMap, err := rpcclient.GetUsersInfoMap(ctx, userIDs) + userMap, err := t.userClient.GetUsersInfoMap(ctx, userIDs) if err != nil { return nil, err } diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index dc964fdb1..0b8ca25a8 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -17,6 +17,7 @@ package third import ( "context" "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "time" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -45,6 +46,7 @@ type thirdServer struct { defaultExpire time.Duration config *Config s3 s3.Interface + userClient *rpcli.UserClient } type Config struct { @@ -98,6 +100,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } + userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) + if err != nil { + return err + } localcache.InitLocalCache(&config.LocalCacheConfig) third.RegisterThirdServer(server, &thirdServer{ thirdDatabase: controller.NewThirdDatabase(redis.NewThirdCache(rdb), logdb), @@ -105,6 +111,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg defaultExpire: time.Hour * 24 * 7, config: config, s3: o, + userClient: rpcli.NewUserClient(userConn), }) return nil } diff --git a/internal/rpc/user/notification.go b/internal/rpc/user/notification.go index 54e5b27d7..d414fa1ad 100644 --- a/internal/rpc/user/notification.go +++ b/internal/rpc/user/notification.go @@ -16,6 +16,8 @@ package user import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "github.com/openimsdk/protocol/msg" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" @@ -59,9 +61,11 @@ func WithUserFunc( } } -func NewUserNotificationSender(config *Config, opts ...userNotificationSenderOptions) *UserNotificationSender { +func NewUserNotificationSender(config *Config, msgClient *rpcli.MsgClient, opts ...userNotificationSenderOptions) *UserNotificationSender { f := &UserNotificationSender{ - NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient()), + NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { + return msgClient.SendMsg(ctx, req) + })), } for _, opt := range opts { opt(f) diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 8c3a54f66..b03ec3bae 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -95,6 +95,11 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi if err != nil { return err } + msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg) + if err != nil { + return err + } + msgClient := rpcli.NewMsgClient(msgConn) userCache := redis.NewUserCacheRedis(rdb, &config.LocalCacheConfig, userDB, redis.GetRocksCacheOptions()) database := controller.NewUserDatabase(userDB, userCache, mgocli.GetTx()) localcache.InitLocalCache(&config.LocalCacheConfig) @@ -102,8 +107,8 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi online: redis.NewUserOnline(rdb), db: database, RegisterCenter: client, - friendNotificationSender: relation.NewFriendNotificationSender(&config.NotificationConfig, relation.WithDBFunc(database.FindWithError)), - userNotificationSender: NewUserNotificationSender(config, WithUserFunc(database.FindWithError)), + friendNotificationSender: relation.NewFriendNotificationSender(&config.NotificationConfig, msgClient, relation.WithDBFunc(database.FindWithError)), + userNotificationSender: NewUserNotificationSender(config, msgClient, WithUserFunc(database.FindWithError)), config: config, webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), } diff --git a/pkg/common/convert/group.go b/pkg/common/convert/group.go index bc2b2f998..5e41599c6 100644 --- a/pkg/common/convert/group.go +++ b/pkg/common/convert/group.go @@ -80,9 +80,18 @@ func Db2PbGroupMember(m *model.GroupMember) *sdkws.GroupMemberFullInfo { } } -func Db2PbGroupRequest(m *model.GroupRequest, user *sdkws.PublicUserInfo, group *sdkws.GroupInfo) *sdkws.GroupRequest { +func Db2PbGroupRequest(m *model.GroupRequest, user *sdkws.UserInfo, group *sdkws.GroupInfo) *sdkws.GroupRequest { + var pu *sdkws.PublicUserInfo + if user != nil { + pu = &sdkws.PublicUserInfo{ + UserID: user.UserID, + Nickname: user.Nickname, + FaceURL: user.FaceURL, + Ex: user.Ex, + } + } return &sdkws.GroupRequest{ - UserInfo: user, + UserInfo: pu, GroupInfo: group, HandleResult: m.HandleResult, ReqMsg: m.ReqMsg, diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 089bc3d97..fd46fb45c 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -27,7 +27,6 @@ import ( "time" conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/jsonutil" @@ -101,10 +100,6 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf defer client.Close() client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) - if err = rpcclient.InitRpcCaller(client, discovery.RpcService); err != nil { - return err - } - // var reg *prometheus.Registry // var metric *grpcprometheus.ServerMetrics if prometheusConfig.Enable { diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go index ba3690f44..f354efa59 100644 --- a/pkg/rpccache/conversation.go +++ b/pkg/rpccache/conversation.go @@ -16,12 +16,11 @@ package rpccache import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/localcache" - pbconv "github.com/openimsdk/protocol/conversation" - "github.com/openimsdk/protocol/rpccall" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" @@ -33,10 +32,11 @@ const ( conversationWorkerCount = 20 ) -func NewConversationLocalCache(localCache *config.LocalCache, cli redis.UniversalClient) *ConversationLocalCache { +func NewConversationLocalCache(client *rpcli.ConversationClient, localCache *config.LocalCache, cli redis.UniversalClient) *ConversationLocalCache { lc := localCache.Conversation log.ZDebug(context.Background(), "ConversationLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable()) x := &ConversationLocalCache{ + client: client, local: localcache.New[[]byte]( localcache.WithLocalSlotNum(lc.SlotNum), localcache.WithLocalSlotSize(lc.SlotSize), @@ -52,7 +52,8 @@ func NewConversationLocalCache(localCache *config.LocalCache, cli redis.Universa } type ConversationLocalCache struct { - local localcache.Cache[[]byte] + client *rpcli.ConversationClient + local localcache.Cache[[]byte] } func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUserID string) (val []string, err error) { @@ -63,7 +64,7 @@ func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUs return resp.ConversationIDs, nil } -func (c *ConversationLocalCache) getConversationIDs(ctx context.Context, ownerUserID string) (val *pbconv.GetConversationIDsResp, err error) { +func (c *ConversationLocalCache) getConversationIDs(ctx context.Context, ownerUserID string) (val *pbconversation.GetConversationIDsResp, err error) { log.ZDebug(ctx, "ConversationLocalCache getConversationIDs req", "ownerUserID", ownerUserID) defer func() { if err == nil { @@ -72,14 +73,14 @@ func (c *ConversationLocalCache) getConversationIDs(ctx context.Context, ownerUs log.ZError(ctx, "ConversationLocalCache getConversationIDs return", err, "ownerUserID", ownerUserID) } }() - var cache cacheProto[pbconv.GetConversationIDsResp] + var cache cacheProto[pbconversation.GetConversationIDsResp] return cache.Unmarshal(c.local.Get(ctx, cachekey.GetConversationIDsKey(ownerUserID), func(ctx context.Context) ([]byte, error) { log.ZDebug(ctx, "ConversationLocalCache getConversationIDs rpc", "ownerUserID", ownerUserID) - return cache.Marshal(pbconv.GetConversationIDsCaller.Invoke(ctx, &pbconv.GetConversationIDsReq{UserID: ownerUserID})) + return cache.Marshal(c.client.ConversationClient.GetConversationIDs(ctx, &pbconversation.GetConversationIDsReq{UserID: ownerUserID})) })) } -func (c *ConversationLocalCache) GetConversation(ctx context.Context, userID, conversationID string) (val *pbconv.Conversation, err error) { +func (c *ConversationLocalCache) GetConversation(ctx context.Context, userID, conversationID string) (val *pbconversation.Conversation, err error) { log.ZDebug(ctx, "ConversationLocalCache GetConversation req", "userID", userID, "conversationID", conversationID) defer func() { if err == nil { @@ -88,13 +89,10 @@ func (c *ConversationLocalCache) GetConversation(ctx context.Context, userID, co log.ZWarn(ctx, "ConversationLocalCache GetConversation return", err, "userID", userID, "conversationID", conversationID) } }() - var cache cacheProto[pbconv.Conversation] + var cache cacheProto[pbconversation.Conversation] return cache.Unmarshal(c.local.Get(ctx, cachekey.GetConversationKey(userID, conversationID), func(ctx context.Context) ([]byte, error) { log.ZDebug(ctx, "ConversationLocalCache GetConversation rpc", "userID", userID, "conversationID", conversationID) - return cache.Marshal(rpccall.ExtractField(ctx, pbconv.GetConversationCaller.Invoke, &pbconv.GetConversationReq{ - ConversationID: conversationID, - OwnerUserID: userID, - }, (*pbconv.GetConversationResp).GetConversation)) + return cache.Marshal(c.client.GetConversation(ctx, userID, conversationID)) })) } @@ -106,10 +104,10 @@ func (c *ConversationLocalCache) GetSingleConversationRecvMsgOpt(ctx context.Con return conv.RecvMsgOpt, nil } -func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconv.Conversation, error) { +func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) { var ( - conversations = make([]*pbconv.Conversation, 0, len(conversationIDs)) - conversationsChan = make(chan *pbconv.Conversation, len(conversationIDs)) + conversations = make([]*pbconversation.Conversation, 0, len(conversationIDs)) + conversationsChan = make(chan *pbconversation.Conversation, len(conversationIDs)) ) g, ctx := errgroup.WithContext(ctx) @@ -139,7 +137,7 @@ func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUser return conversations, nil } -func (c *ConversationLocalCache) getConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) (val *pbconv.GetConversationNotReceiveMessageUserIDsResp, err error) { +func (c *ConversationLocalCache) getConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) (val *pbconversation.GetConversationNotReceiveMessageUserIDsResp, err error) { log.ZDebug(ctx, "ConversationLocalCache getConversationNotReceiveMessageUserIDs req", "conversationID", conversationID) defer func() { if err == nil { @@ -148,10 +146,10 @@ func (c *ConversationLocalCache) getConversationNotReceiveMessageUserIDs(ctx con log.ZError(ctx, "ConversationLocalCache getConversationNotReceiveMessageUserIDs return", err, "conversationID", conversationID) } }() - var cache cacheProto[pbconv.GetConversationNotReceiveMessageUserIDsResp] + var cache cacheProto[pbconversation.GetConversationNotReceiveMessageUserIDsResp] return cache.Unmarshal(c.local.Get(ctx, cachekey.GetConversationNotReceiveMessageUserIDsKey(conversationID), func(ctx context.Context) ([]byte, error) { log.ZDebug(ctx, "ConversationLocalCache getConversationNotReceiveMessageUserIDs rpc", "conversationID", conversationID) - return cache.Marshal(pbconv.GetConversationNotReceiveMessageUserIDsCaller.Invoke(ctx, &pbconv.GetConversationNotReceiveMessageUserIDsReq{ConversationID: conversationID})) + return cache.Marshal(c.client.ConversationClient.GetConversationNotReceiveMessageUserIDs(ctx, &pbconversation.GetConversationNotReceiveMessageUserIDsReq{ConversationID: conversationID})) })) } diff --git a/pkg/rpccache/friend.go b/pkg/rpccache/friend.go index 865cac7b5..8ed6c1ae9 100644 --- a/pkg/rpccache/friend.go +++ b/pkg/rpccache/friend.go @@ -16,8 +16,8 @@ package rpccache import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/protocol/relation" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -26,10 +26,11 @@ import ( "github.com/redis/go-redis/v9" ) -func NewFriendLocalCache(localCache *config.LocalCache, cli redis.UniversalClient) *FriendLocalCache { +func NewFriendLocalCache(client *rpcli.RelationClient, localCache *config.LocalCache, cli redis.UniversalClient) *FriendLocalCache { lc := localCache.Friend log.ZDebug(context.Background(), "FriendLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable()) x := &FriendLocalCache{ + client: client, local: localcache.New[[]byte]( localcache.WithLocalSlotNum(lc.SlotNum), localcache.WithLocalSlotSize(lc.SlotSize), @@ -45,7 +46,8 @@ func NewFriendLocalCache(localCache *config.LocalCache, cli redis.UniversalClien } type FriendLocalCache struct { - local localcache.Cache[[]byte] + client *rpcli.RelationClient + local localcache.Cache[[]byte] } func (f *FriendLocalCache) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (val bool, err error) { @@ -68,7 +70,7 @@ func (f *FriendLocalCache) isFriend(ctx context.Context, possibleFriendUserID, u var cache cacheProto[relation.IsFriendResp] return cache.Unmarshal(f.local.GetLink(ctx, cachekey.GetIsFriendKey(possibleFriendUserID, userID), func(ctx context.Context) ([]byte, error) { log.ZDebug(ctx, "FriendLocalCache isFriend rpc", "possibleFriendUserID", possibleFriendUserID, "userID", userID) - return cache.Marshal(relation.IsFriendCaller.Invoke(ctx, &relation.IsFriendReq{UserID1: userID, UserID2: possibleFriendUserID})) + return cache.Marshal(f.client.FriendClient.IsFriend(ctx, &relation.IsFriendReq{UserID1: userID, UserID2: possibleFriendUserID})) }, cachekey.GetFriendIDsKey(possibleFriendUserID))) } @@ -94,6 +96,6 @@ func (f *FriendLocalCache) isBlack(ctx context.Context, possibleBlackUserID, use var cache cacheProto[relation.IsBlackResp] return cache.Unmarshal(f.local.GetLink(ctx, cachekey.GetIsBlackIDsKey(possibleBlackUserID, userID), func(ctx context.Context) ([]byte, error) { log.ZDebug(ctx, "FriendLocalCache IsBlack rpc", "possibleBlackUserID", possibleBlackUserID, "userID", userID) - return cache.Marshal(relation.IsBlackCaller.Invoke(ctx, &relation.IsBlackReq{UserID1: possibleBlackUserID, UserID2: userID})) + return cache.Marshal(f.client.FriendClient.IsBlack(ctx, &relation.IsBlackReq{UserID1: possibleBlackUserID, UserID2: userID})) }, cachekey.GetBlackIDsKey(userID))) } diff --git a/pkg/rpccache/group.go b/pkg/rpccache/group.go index 111813103..174ba7dc5 100644 --- a/pkg/rpccache/group.go +++ b/pkg/rpccache/group.go @@ -16,10 +16,9 @@ package rpccache import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/protocol/group" - "github.com/openimsdk/protocol/rpccall" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -30,10 +29,11 @@ import ( "github.com/redis/go-redis/v9" ) -func NewGroupLocalCache(localCache *config.LocalCache, cli redis.UniversalClient) *GroupLocalCache { +func NewGroupLocalCache(client *rpcli.GroupClient, localCache *config.LocalCache, cli redis.UniversalClient) *GroupLocalCache { lc := localCache.Group log.ZDebug(context.Background(), "GroupLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable()) x := &GroupLocalCache{ + client: client, local: localcache.New[[]byte]( localcache.WithLocalSlotNum(lc.SlotNum), localcache.WithLocalSlotSize(lc.SlotSize), @@ -49,7 +49,8 @@ func NewGroupLocalCache(localCache *config.LocalCache, cli redis.UniversalClient } type GroupLocalCache struct { - local localcache.Cache[[]byte] + client *rpcli.GroupClient + local localcache.Cache[[]byte] } func (g *GroupLocalCache) getGroupMemberIDs(ctx context.Context, groupID string) (val *group.GetGroupMemberUserIDsResp, err error) { @@ -64,7 +65,7 @@ func (g *GroupLocalCache) getGroupMemberIDs(ctx context.Context, groupID string) var cache cacheProto[group.GetGroupMemberUserIDsResp] return cache.Unmarshal(g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) ([]byte, error) { log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs rpc", "groupID", groupID) - return cache.Marshal(group.GetGroupMemberUserIDsCaller.Invoke(ctx, &group.GetGroupMemberUserIDsReq{GroupID: groupID})) + return cache.Marshal(g.client.GroupClient.GetGroupMemberUserIDs(ctx, &group.GetGroupMemberUserIDsReq{GroupID: groupID})) })) } @@ -80,13 +81,7 @@ func (g *GroupLocalCache) GetGroupMember(ctx context.Context, groupID, userID st var cache cacheProto[sdkws.GroupMemberFullInfo] return cache.Unmarshal(g.local.Get(ctx, cachekey.GetGroupMemberInfoKey(groupID, userID), func(ctx context.Context) ([]byte, error) { log.ZDebug(ctx, "GroupLocalCache GetGroupInfo rpc", "groupID", groupID, "userID", userID) - return cache.Marshal(rpccall.ExtractField(ctx, group.GetGroupMemberCacheCaller.Invoke, - &group.GetGroupMemberCacheReq{ - GroupID: groupID, - GroupMemberID: userID, - }, - (*group.GetGroupMemberCacheResp).GetMember, - )) + return cache.Marshal(g.client.GetGroupMemberCache(ctx, groupID, userID)) })) } @@ -102,10 +97,7 @@ func (g *GroupLocalCache) GetGroupInfo(ctx context.Context, groupID string) (val var cache cacheProto[sdkws.GroupInfo] return cache.Unmarshal(g.local.Get(ctx, cachekey.GetGroupInfoKey(groupID), func(ctx context.Context) ([]byte, error) { log.ZDebug(ctx, "GroupLocalCache GetGroupInfo rpc", "groupID", groupID) - return cache.Marshal(rpccall.ExtractField(ctx, group.GetGroupInfoCacheCaller.Invoke, - &group.GetGroupInfoCacheReq{ - GroupID: groupID, - }, (*group.GetGroupInfoCacheResp).GetGroupInfo)) + return cache.Marshal(g.client.GetGroupInfoCache(ctx, groupID)) })) } diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index 25362b529..8f5323477 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -3,16 +3,15 @@ package rpccache import ( "context" "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/protocol/user" "math/rand" "strconv" "sync" "sync/atomic" "time" - "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/protocol/rpccall" - "github.com/openimsdk/protocol/user" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/localcache/lru" @@ -23,10 +22,10 @@ import ( "github.com/redis/go-redis/v9" ) -func NewOnlineCache(adminUserID []string, group *GroupLocalCache, rdb redis.UniversalClient, fullUserCache bool, fn func(ctx context.Context, userID string, platformIDs []int32)) (*OnlineCache, error) { +func NewOnlineCache(client *rpcli.UserClient, group *GroupLocalCache, rdb redis.UniversalClient, fullUserCache bool, fn func(ctx context.Context, userID string, platformIDs []int32)) (*OnlineCache, error) { l := &sync.Mutex{} x := &OnlineCache{ - adminUserID: adminUserID, + client: client, group: group, fullUserCache: fullUserCache, Lock: l, @@ -66,8 +65,8 @@ const ( ) type OnlineCache struct { - adminUserID []string - group *GroupLocalCache + client *rpcli.UserClient + group *GroupLocalCache // fullUserCache if enabled, caches the online status of all users using mapCache; // otherwise, only a portion of users' online statuses (regardless of whether they are online) will be cached using lruCache. @@ -113,7 +112,7 @@ func (o *OnlineCache) initUsersOnlineStatus(ctx context.Context) (err error) { cursor := uint64(0) for resp == nil || resp.NextCursor != 0 { if err = retryOperation(func() error { - resp, err = user.GetAllOnlineUsersCaller.Invoke(ctx, &user.GetAllOnlineUsersReq{Cursor: cursor}) + resp, err = o.client.GetAllOnlineUsers(ctx, cursor) if err != nil { return err } @@ -187,17 +186,7 @@ func (o *OnlineCache) doSubscribe(ctx context.Context, rdb redis.UniversalClient func (o *OnlineCache) getUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) { platformIDs, err := o.lruCache.Get(userID, func() ([]int32, error) { - resp, err := rpccall.ExtractField(ctx, user.GetUserStatusCaller.Invoke, &user.GetUserStatusReq{ - UserID: o.adminUserID[0], - UserIDs: []string{userID}, - }, (*user.GetUserStatusResp).GetStatusList) - if err != nil { - return nil, err - } - if len(resp) == 0 { - return nil, nil - } - return resp[0].PlatformIDs, nil + return o.client.GetUserOnlinePlatform(ctx, userID) }) if err != nil { log.ZError(ctx, "OnlineCache GetUserOnlinePlatform", err, "userID", userID) @@ -238,11 +227,7 @@ func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, e func (o *OnlineCache) getUserOnlinePlatformBatch(ctx context.Context, userIDs []string) (map[string][]int32, error) { platformIDsMap, err := o.lruCache.GetBatch(userIDs, func(missingUsers []string) (map[string][]int32, error) { platformIDsMap := make(map[string][]int32) - - usersStatus, err := rpccall.ExtractField(ctx, user.GetUserStatusCaller.Invoke, &user.GetUserStatusReq{ - UserID: o.adminUserID[0], - UserIDs: missingUsers, - }, (*user.GetUserStatusResp).GetStatusList) + usersStatus, err := o.client.GetUsersOnlinePlatform(ctx, missingUsers) if err != nil { return nil, err } diff --git a/pkg/rpccache/subscriber.go b/pkg/rpccache/subscriber.go index 44e1f5885..d28d1aa29 100644 --- a/pkg/rpccache/subscriber.go +++ b/pkg/rpccache/subscriber.go @@ -17,7 +17,6 @@ package rpccache import ( "context" "encoding/json" - "github.com/openimsdk/tools/log" "github.com/redis/go-redis/v9" ) diff --git a/pkg/rpccache/user.go b/pkg/rpccache/user.go index fce2c911a..cb0704906 100644 --- a/pkg/rpccache/user.go +++ b/pkg/rpccache/user.go @@ -16,11 +16,11 @@ package rpccache import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/localcache" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/user" "github.com/openimsdk/tools/errs" @@ -28,10 +28,11 @@ import ( "github.com/redis/go-redis/v9" ) -func NewUserLocalCache(localCache *config.LocalCache, cli redis.UniversalClient) *UserLocalCache { +func NewUserLocalCache(client *rpcli.UserClient, localCache *config.LocalCache, cli redis.UniversalClient) *UserLocalCache { lc := localCache.User log.ZDebug(context.Background(), "UserLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable()) x := &UserLocalCache{ + client: client, local: localcache.New[[]byte]( localcache.WithLocalSlotNum(lc.SlotNum), localcache.WithLocalSlotSize(lc.SlotSize), @@ -47,7 +48,8 @@ func NewUserLocalCache(localCache *config.LocalCache, cli redis.UniversalClient) } type UserLocalCache struct { - local localcache.Cache[[]byte] + client *rpcli.UserClient + local localcache.Cache[[]byte] } func (u *UserLocalCache) GetUserInfo(ctx context.Context, userID string) (val *sdkws.UserInfo, err error) { @@ -62,7 +64,7 @@ func (u *UserLocalCache) GetUserInfo(ctx context.Context, userID string) (val *s var cache cacheProto[sdkws.UserInfo] return cache.Unmarshal(u.local.Get(ctx, cachekey.GetUserInfoKey(userID), func(ctx context.Context) ([]byte, error) { log.ZDebug(ctx, "UserLocalCache GetUserInfo rpc", "userID", userID) - return cache.Marshal(rpcclient.GetUserInfo(ctx, userID)) + return cache.Marshal(u.client.GetUserInfo(ctx, userID)) })) } @@ -86,7 +88,7 @@ func (u *UserLocalCache) getUserGlobalMsgRecvOpt(ctx context.Context, userID str var cache cacheProto[user.GetGlobalRecvMessageOptResp] return cache.Unmarshal(u.local.Get(ctx, cachekey.GetUserGlobalRecvMsgOptKey(userID), func(ctx context.Context) ([]byte, error) { log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt rpc", "userID", userID) - return cache.Marshal(user.GetGlobalRecvMessageOptCaller.Invoke(ctx, &user.GetGlobalRecvMessageOptReq{UserID: userID})) + return cache.Marshal(u.client.UserClient.GetGlobalRecvMessageOpt(ctx, &user.GetGlobalRecvMessageOptReq{UserID: userID})) })) } diff --git a/pkg/rpcclient/init.go b/pkg/rpcclient/init.go index 3d3f68aef..7fc090661 100644 --- a/pkg/rpcclient/init.go +++ b/pkg/rpcclient/init.go @@ -1,60 +1,60 @@ package rpcclient -import ( - "context" - - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - pbauth "github.com/openimsdk/protocol/auth" - pbconversation "github.com/openimsdk/protocol/conversation" - pbgroup "github.com/openimsdk/protocol/group" - pbmsg "github.com/openimsdk/protocol/msg" - pbmsggateway "github.com/openimsdk/protocol/msggateway" - pbpush "github.com/openimsdk/protocol/push" - pbrelation "github.com/openimsdk/protocol/relation" - pbthird "github.com/openimsdk/protocol/third" - pbuser "github.com/openimsdk/protocol/user" - "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/system/program" - "google.golang.org/grpc" -) - -func InitRpcCaller(discov discovery.SvcDiscoveryRegistry, service config.RpcService) error { - initConn := func(discov discovery.SvcDiscoveryRegistry, name string, initFunc func(conn *grpc.ClientConn)) error { - conn, err := discov.GetConn(context.Background(), name) - if err != nil { - program.ExitWithError(err) - return err - } - initFunc(conn) - return nil - } - if err := initConn(discov, service.Auth, pbauth.InitAuth); err != nil { - return err - } - if err := initConn(discov, service.Conversation, pbconversation.InitConversation); err != nil { - return err - } - if err := initConn(discov, service.Group, pbgroup.InitGroup); err != nil { - return err - } - if err := initConn(discov, service.Msg, pbmsg.InitMsg); err != nil { - return err - } - if err := initConn(discov, service.MessageGateway, pbmsggateway.InitMsgGateway); err != nil { - return err - } - if err := initConn(discov, service.Push, pbpush.InitPushMsgService); err != nil { - return err - } - if err := initConn(discov, service.Friend, pbrelation.InitFriend); err != nil { - return err - } - if err := initConn(discov, service.Third, pbthird.InitThird); err != nil { - return err - } - if err := initConn(discov, service.User, pbuser.InitUser); err != nil { - return err - } - - return nil -} +//import ( +// "context" +// +// "github.com/openimsdk/open-im-server/v3/pkg/common/config" +// pbauth "github.com/openimsdk/protocol/auth" +// pbconversation "github.com/openimsdk/protocol/conversation" +// pbgroup "github.com/openimsdk/protocol/group" +// pbmsg "github.com/openimsdk/protocol/msg" +// pbmsggateway "github.com/openimsdk/protocol/msggateway" +// pbpush "github.com/openimsdk/protocol/push" +// pbrelation "github.com/openimsdk/protocol/relation" +// pbthird "github.com/openimsdk/protocol/third" +// pbuser "github.com/openimsdk/protocol/user" +// "github.com/openimsdk/tools/discovery" +// "github.com/openimsdk/tools/system/program" +// "google.golang.org/grpc" +//) +// +//func InitRpcCaller(discov discovery.SvcDiscoveryRegistry, service config.RpcService) error { +// initConn := func(discov discovery.SvcDiscoveryRegistry, name string, initFunc func(conn *grpc.ClientConn)) error { +// conn, err := discov.GetConn(context.Background(), name) +// if err != nil { +// program.ExitWithError(err) +// return err +// } +// initFunc(conn) +// return nil +// } +// if err := initConn(discov, service.Auth, pbauth.InitAuth); err != nil { +// return err +// } +// if err := initConn(discov, service.Conversation, pbconversation.InitConversation); err != nil { +// return err +// } +// if err := initConn(discov, service.Group, pbgroup.InitGroup); err != nil { +// return err +// } +// if err := initConn(discov, service.Msg, pbmsg.InitMsg); err != nil { +// return err +// } +// if err := initConn(discov, service.MessageGateway, pbmsggateway.InitMsgGateway); err != nil { +// return err +// } +// if err := initConn(discov, service.Push, pbpush.InitPushMsgService); err != nil { +// return err +// } +// if err := initConn(discov, service.Friend, pbrelation.InitFriend); err != nil { +// return err +// } +// if err := initConn(discov, service.Third, pbthird.InitThird); err != nil { +// return err +// } +// if err := initConn(discov, service.User, pbuser.InitUser); err != nil { +// return err +// } +// +// return nil +//} diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index f0f0be40f..253ec3559 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -151,17 +151,17 @@ func WithLocalSendMsg(sendMsg func(ctx context.Context, req *msg.SendMsgReq) (*m } } -func WithRpcClient() NotificationSenderOptions { +func WithRpcClient(sendMsg func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error)) NotificationSenderOptions { return func(s *NotificationSender) { s.sendMsg = func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { - return s.msgClient.SendMsg(ctx, req) + return sendMsg(ctx, req) } } } -func WithUserRpcClient() NotificationSenderOptions { +func WithUserRpcClient(getUserInfo func(ctx context.Context, userID string) (*sdkws.UserInfo, error)) NotificationSenderOptions { return func(s *NotificationSender) { - s.getUserInfo = GetUserInfo + s.getUserInfo = getUserInfo } } diff --git a/pkg/rpcclient/user.go b/pkg/rpcclient/user.go index 463dd9a39..8d1b4efcb 100644 --- a/pkg/rpcclient/user.go +++ b/pkg/rpcclient/user.go @@ -14,96 +14,97 @@ package rpcclient -import ( - "context" - "strings" - - "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" - "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/protocol/user" - "github.com/openimsdk/tools/utils/datautil" -) - -// GetUsersInfo retrieves information for multiple users based on their user IDs. -func GetUsersInfo(ctx context.Context, userIDs []string) ([]*sdkws.UserInfo, error) { - if len(userIDs) == 0 { - return []*sdkws.UserInfo{}, nil - } - resp, err := user.GetDesignateUsersCaller.Invoke(ctx, &user.GetDesignateUsersReq{ - UserIDs: userIDs, - }) - if err != nil { - return nil, err - } - if ids := datautil.Single(userIDs, datautil.Slice(resp.UsersInfo, func(e *sdkws.UserInfo) string { - return e.UserID - })); len(ids) > 0 { - return nil, servererrs.ErrUserIDNotFound.WrapMsg(strings.Join(ids, ",")) - } - return resp.UsersInfo, nil -} - -// GetUserInfo retrieves information for a single user based on the provided user ID. -func GetUserInfo(ctx context.Context, userID string) (*sdkws.UserInfo, error) { - users, err := GetUsersInfo(ctx, []string{userID}) - if err != nil { - return nil, err - } - return users[0], nil -} - -// GetUsersInfoMap retrieves a map of user information indexed by their user IDs. -func GetUsersInfoMap(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error) { - users, err := GetUsersInfo(ctx, userIDs) - if err != nil { - return nil, err - } - return datautil.SliceToMap(users, func(e *sdkws.UserInfo) string { - return e.UserID - }), nil -} - -// GetPublicUserInfos retrieves public information for multiple users based on their user IDs. -func GetPublicUserInfos( - ctx context.Context, - userIDs []string, -) ([]*sdkws.PublicUserInfo, error) { - users, err := GetUsersInfo(ctx, userIDs) - if err != nil { - return nil, err - } - - return datautil.Slice(users, func(e *sdkws.UserInfo) *sdkws.PublicUserInfo { - return &sdkws.PublicUserInfo{ - UserID: e.UserID, - Nickname: e.Nickname, - FaceURL: e.FaceURL, - Ex: e.Ex, - } - }), nil -} - -// GetPublicUserInfo retrieves public information for a single user based on the provided user ID. -func GetPublicUserInfo(ctx context.Context, userID string) (*sdkws.PublicUserInfo, error) { - users, err := GetPublicUserInfos(ctx, []string{userID}) - if err != nil { - return nil, err - } - - return users[0], nil -} - -// GetPublicUserInfoMap retrieves a map of public user information indexed by their user IDs. -func GetPublicUserInfoMap( - ctx context.Context, - userIDs []string, -) (map[string]*sdkws.PublicUserInfo, error) { - users, err := GetPublicUserInfos(ctx, userIDs) - if err != nil { - return nil, err - } - - return datautil.SliceToMap(users, func(e *sdkws.PublicUserInfo) string { - return e.UserID - }), nil -} +// +//import ( +// "context" +// "strings" +// +// "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" +// "github.com/openimsdk/protocol/sdkws" +// "github.com/openimsdk/protocol/user" +// "github.com/openimsdk/tools/utils/datautil" +//) +// +//// GetUsersInfo retrieves information for multiple users based on their user IDs. +//func GetUsersInfo(ctx context.Context, userIDs []string) ([]*sdkws.UserInfo, error) { +// if len(userIDs) == 0 { +// return []*sdkws.UserInfo{}, nil +// } +// resp, err := user.GetDesignateUsersCaller.Invoke(ctx, &user.GetDesignateUsersReq{ +// UserIDs: userIDs, +// }) +// if err != nil { +// return nil, err +// } +// if ids := datautil.Single(userIDs, datautil.Slice(resp.UsersInfo, func(e *sdkws.UserInfo) string { +// return e.UserID +// })); len(ids) > 0 { +// return nil, servererrs.ErrUserIDNotFound.WrapMsg(strings.Join(ids, ",")) +// } +// return resp.UsersInfo, nil +//} +// +//// GetUserInfo retrieves information for a single user based on the provided user ID. +//func GetUserInfo(ctx context.Context, userID string) (*sdkws.UserInfo, error) { +// users, err := GetUsersInfo(ctx, []string{userID}) +// if err != nil { +// return nil, err +// } +// return users[0], nil +//} +// +//// GetUsersInfoMap retrieves a map of user information indexed by their user IDs. +//func GetUsersInfoMap(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error) { +// users, err := GetUsersInfo(ctx, userIDs) +// if err != nil { +// return nil, err +// } +// return datautil.SliceToMap(users, func(e *sdkws.UserInfo) string { +// return e.UserID +// }), nil +//} +// +//// GetPublicUserInfos retrieves public information for multiple users based on their user IDs. +//func GetPublicUserInfos( +// ctx context.Context, +// userIDs []string, +//) ([]*sdkws.PublicUserInfo, error) { +// users, err := GetUsersInfo(ctx, userIDs) +// if err != nil { +// return nil, err +// } +// +// return datautil.Slice(users, func(e *sdkws.UserInfo) *sdkws.PublicUserInfo { +// return &sdkws.PublicUserInfo{ +// UserID: e.UserID, +// Nickname: e.Nickname, +// FaceURL: e.FaceURL, +// Ex: e.Ex, +// } +// }), nil +//} +// +//// GetPublicUserInfo retrieves public information for a single user based on the provided user ID. +//func GetPublicUserInfo(ctx context.Context, userID string) (*sdkws.PublicUserInfo, error) { +// users, err := GetPublicUserInfos(ctx, []string{userID}) +// if err != nil { +// return nil, err +// } +// +// return users[0], nil +//} +// +//// GetPublicUserInfoMap retrieves a map of public user information indexed by their user IDs. +//func GetPublicUserInfoMap( +// ctx context.Context, +// userIDs []string, +//) (map[string]*sdkws.PublicUserInfo, error) { +// users, err := GetPublicUserInfos(ctx, userIDs) +// if err != nil { +// return nil, err +// } +// +// return datautil.SliceToMap(users, func(e *sdkws.PublicUserInfo) string { +// return e.UserID +// }), nil +//} diff --git a/pkg/rpcli/auth.go b/pkg/rpcli/auth.go index a159eb955..1f9731c21 100644 --- a/pkg/rpcli/auth.go +++ b/pkg/rpcli/auth.go @@ -1,9 +1,12 @@ package rpcli -import "github.com/openimsdk/protocol/auth" +import ( + "github.com/openimsdk/protocol/auth" + "google.golang.org/grpc" +) -func NewAuthClient(cli auth.AuthClient) *AuthClient { - return &AuthClient{cli} +func NewAuthClient(cc grpc.ClientConnInterface) *AuthClient { + return &AuthClient{auth.NewAuthClient(cc)} } type AuthClient struct { diff --git a/pkg/rpcli/conversation.go b/pkg/rpcli/conversation.go index c099500cf..b1409c923 100644 --- a/pkg/rpcli/conversation.go +++ b/pkg/rpcli/conversation.go @@ -3,10 +3,11 @@ package rpcli import ( "context" "github.com/openimsdk/protocol/conversation" + "google.golang.org/grpc" ) -func NewConversationClient(cli conversation.ConversationClient) *ConversationClient { - return &ConversationClient{cli} +func NewConversationClient(cc grpc.ClientConnInterface) *ConversationClient { + return &ConversationClient{conversation.NewConversationClient(cc)} } type ConversationClient struct { @@ -46,3 +47,27 @@ func (x *ConversationClient) GetConversations(ctx context.Context, conversationI req := &conversation.GetConversationsReq{ConversationIDs: conversationIDs, OwnerUserID: ownerUserID} return extractField(ctx, x.ConversationClient.GetConversations, req, (*conversation.GetConversationsResp).GetConversations) } + +func (x *ConversationClient) GetConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { + req := &conversation.GetConversationIDsReq{UserID: ownerUserID} + return extractField(ctx, x.ConversationClient.GetConversationIDs, req, (*conversation.GetConversationIDsResp).GetConversationIDs) +} + +func (x *ConversationClient) GetPinnedConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { + req := &conversation.GetPinnedConversationIDsReq{UserID: ownerUserID} + return extractField(ctx, x.ConversationClient.GetPinnedConversationIDs, req, (*conversation.GetPinnedConversationIDsResp).GetConversationIDs) +} + +func (x *ConversationClient) CreateGroupChatConversations(ctx context.Context, groupID string, userIDs []string) error { + req := &conversation.CreateGroupChatConversationsReq{GroupID: groupID, UserIDs: userIDs} + return ignoreResp(x.ConversationClient.CreateGroupChatConversations(ctx, req)) +} + +func (x *ConversationClient) CreateSingleChatConversations(ctx context.Context, req *conversation.CreateSingleChatConversationsReq) error { + return ignoreResp(x.ConversationClient.CreateSingleChatConversations(ctx, req)) +} + +func (x *ConversationClient) GetConversationOfflinePushUserIDs(ctx context.Context, conversationID string, userIDs []string) ([]string, error) { + req := &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: conversationID, UserIDs: userIDs} + return extractField(ctx, x.ConversationClient.GetConversationOfflinePushUserIDs, req, (*conversation.GetConversationOfflinePushUserIDsResp).GetUserIDs) +} diff --git a/pkg/rpcli/group.go b/pkg/rpcli/group.go index 763a92dfd..7965732b3 100644 --- a/pkg/rpcli/group.go +++ b/pkg/rpcli/group.go @@ -4,25 +4,42 @@ import ( "context" "github.com/openimsdk/protocol/group" "github.com/openimsdk/protocol/sdkws" + "google.golang.org/grpc" ) -func NewGroupClient(cli group.GroupClient) *GroupClient { - return &GroupClient{cli} +func NewGroupClient(cc grpc.ClientConnInterface) *GroupClient { + return &GroupClient{group.NewGroupClient(cc)} } type GroupClient struct { group.GroupClient } -func (x *GroupClient) cli() group.GroupClient { - return x.GroupClient -} - func (x *GroupClient) GetGroupsInfo(ctx context.Context, groupIDs []string) ([]*sdkws.GroupInfo, error) { req := &group.GetGroupsInfoReq{GroupIDs: groupIDs} - return extractField(ctx, x.cli().GetGroupsInfo, req, (*group.GetGroupsInfoResp).GetGroupInfos) + return extractField(ctx, x.GroupClient.GetGroupsInfo, req, (*group.GetGroupsInfoResp).GetGroupInfos) } func (x *GroupClient) GetGroupInfo(ctx context.Context, groupID string) (*sdkws.GroupInfo, error) { return firstValue(x.GetGroupsInfo(ctx, []string{groupID})) } + +func (x *GroupClient) GetGroupInfoCache(ctx context.Context, groupID string) (*sdkws.GroupInfo, error) { + req := &group.GetGroupInfoCacheReq{GroupID: groupID} + return extractField(ctx, x.GroupClient.GetGroupInfoCache, req, (*group.GetGroupInfoCacheResp).GetGroupInfo) +} + +func (x *GroupClient) GetGroupMemberCache(ctx context.Context, groupID string, userID string) (*sdkws.GroupMemberFullInfo, error) { + req := &group.GetGroupMemberCacheReq{GroupID: groupID} + return extractField(ctx, x.GroupClient.GetGroupMemberCache, req, (*group.GetGroupMemberCacheResp).GetMember) +} + +func (x *GroupClient) DismissGroup(ctx context.Context, groupID string, deleteMember bool) error { + req := &group.DismissGroupReq{GroupID: groupID, DeleteMember: deleteMember} + return ignoreResp(x.GroupClient.DismissGroup(ctx, req)) +} + +func (x *GroupClient) GetGroupMemberUserIDs(ctx context.Context, groupID string) ([]string, error) { + req := &group.GetGroupMemberUserIDsReq{GroupID: groupID} + return extractField(ctx, x.GroupClient.GetGroupMemberUserIDs, req, (*group.GetGroupMemberUserIDsResp).GetUserIDs) +} diff --git a/pkg/rpcli/msg.go b/pkg/rpcli/msg.go index 513ed80af..cd540bc36 100644 --- a/pkg/rpcli/msg.go +++ b/pkg/rpcli/msg.go @@ -4,51 +4,63 @@ import ( "context" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" + "google.golang.org/grpc" ) -func NewMsgClient(cli msg.MsgClient) *MsgClient { - return &MsgClient{cli} +func NewMsgClient(cc grpc.ClientConnInterface) *MsgClient { + return &MsgClient{msg.NewMsgClient(cc)} } type MsgClient struct { msg.MsgClient } -func (x *MsgClient) cli() msg.MsgClient { - return x.MsgClient -} - func (x *MsgClient) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { req := &msg.GetMaxSeqsReq{ConversationIDs: conversationIDs} - return extractField(ctx, x.cli().GetMaxSeqs, req, (*msg.SeqsInfoResp).GetMaxSeqs) + return extractField(ctx, x.MsgClient.GetMaxSeqs, req, (*msg.SeqsInfoResp).GetMaxSeqs) } func (x *MsgClient) GetMsgByConversationIDs(ctx context.Context, conversationIDs []string, maxSeqs map[string]int64) (map[string]*sdkws.MsgData, error) { req := &msg.GetMsgByConversationIDsReq{ConversationIDs: conversationIDs, MaxSeqs: maxSeqs} - return extractField(ctx, x.cli().GetMsgByConversationIDs, req, (*msg.GetMsgByConversationIDsResp).GetMsgDatas) + return extractField(ctx, x.MsgClient.GetMsgByConversationIDs, req, (*msg.GetMsgByConversationIDsResp).GetMsgDatas) } func (x *MsgClient) GetHasReadSeqs(ctx context.Context, conversationIDs []string, userID string) (map[string]int64, error) { req := &msg.GetHasReadSeqsReq{ConversationIDs: conversationIDs, UserID: userID} - return extractField(ctx, x.cli().GetHasReadSeqs, req, (*msg.SeqsInfoResp).GetMaxSeqs) + return extractField(ctx, x.MsgClient.GetHasReadSeqs, req, (*msg.SeqsInfoResp).GetMaxSeqs) } func (x *MsgClient) SetUserConversationMaxSeq(ctx context.Context, conversationID string, ownerUserIDs []string, maxSeq int64) error { req := &msg.SetUserConversationMaxSeqReq{ConversationID: conversationID, OwnerUserID: ownerUserIDs, MaxSeq: maxSeq} - return ignoreResp(x.cli().SetUserConversationMaxSeq(ctx, req)) + return ignoreResp(x.MsgClient.SetUserConversationMaxSeq(ctx, req)) } func (x *MsgClient) SetUserConversationMin(ctx context.Context, conversationID string, ownerUserIDs []string, minSeq int64) error { req := &msg.SetUserConversationsMinSeqReq{ConversationID: conversationID, UserIDs: ownerUserIDs, Seq: minSeq} - return ignoreResp(x.cli().SetUserConversationsMinSeq(ctx, req)) + return ignoreResp(x.MsgClient.SetUserConversationsMinSeq(ctx, req)) } func (x *MsgClient) GetLastMessageSeqByTime(ctx context.Context, conversationID string, lastTime int64) (int64, error) { req := &msg.GetLastMessageSeqByTimeReq{ConversationID: conversationID, Time: lastTime} - return extractField(ctx, x.cli().GetLastMessageSeqByTime, req, (*msg.GetLastMessageSeqByTimeResp).GetSeq) + return extractField(ctx, x.MsgClient.GetLastMessageSeqByTime, req, (*msg.GetLastMessageSeqByTimeResp).GetSeq) } func (x *MsgClient) GetConversationMaxSeq(ctx context.Context, conversationID string) (int64, error) { req := &msg.GetConversationMaxSeqReq{ConversationID: conversationID} - return extractField(ctx, x.cli().GetConversationMaxSeq, req, (*msg.GetConversationMaxSeqResp).GetMaxSeq) + return extractField(ctx, x.MsgClient.GetConversationMaxSeq, req, (*msg.GetConversationMaxSeqResp).GetMaxSeq) +} + +func (x *MsgClient) GetActiveConversation(ctx context.Context, conversationIDs []string) ([]*msg.ActiveConversation, error) { + req := &msg.GetActiveConversationReq{ConversationIDs: conversationIDs} + return extractField(ctx, x.MsgClient.GetActiveConversation, req, (*msg.GetActiveConversationResp).GetConversations) +} + +func (x *MsgClient) GetSeqMessage(ctx context.Context, userID string, conversations []*msg.ConversationSeqs) (map[string]*sdkws.PullMsgs, error) { + req := &msg.GetSeqMessageReq{UserID: userID, Conversations: conversations} + return extractField(ctx, x.MsgClient.GetSeqMessage, req, (*msg.GetSeqMessageResp).GetMsgs) +} + +func (x *MsgClient) SetUserConversationsMinSeq(ctx context.Context, conversationID string, userIDs []string, seq int64) error { + req := &msg.SetUserConversationsMinSeqReq{ConversationID: conversationID, UserIDs: userIDs, Seq: seq} + return ignoreResp(x.MsgClient.SetUserConversationsMinSeq(ctx, req)) } diff --git a/pkg/rpcli/msggateway.go b/pkg/rpcli/msggateway.go index 2b25b50e9..3f19963d5 100644 --- a/pkg/rpcli/msggateway.go +++ b/pkg/rpcli/msggateway.go @@ -2,10 +2,11 @@ package rpcli import ( "github.com/openimsdk/protocol/msggateway" + "google.golang.org/grpc" ) -func NewMsgGatewayClient(cli msggateway.MsgGatewayClient) *MsgGatewayClient { - return &MsgGatewayClient{cli} +func NewMsgGatewayClient(cc grpc.ClientConnInterface) *MsgGatewayClient { + return &MsgGatewayClient{msggateway.NewMsgGatewayClient(cc)} } type MsgGatewayClient struct { diff --git a/pkg/rpcli/push.go b/pkg/rpcli/push.go index 7e928daa5..bb33660e4 100644 --- a/pkg/rpcli/push.go +++ b/pkg/rpcli/push.go @@ -2,10 +2,11 @@ package rpcli import ( "github.com/openimsdk/protocol/push" + "google.golang.org/grpc" ) -func NewPushMsgServiceClient(cli push.PushMsgServiceClient) *PushMsgServiceClient { - return &PushMsgServiceClient{cli} +func NewPushMsgServiceClient(cc grpc.ClientConnInterface) *PushMsgServiceClient { + return &PushMsgServiceClient{push.NewPushMsgServiceClient(cc)} } type PushMsgServiceClient struct { diff --git a/pkg/rpcli/relation.go b/pkg/rpcli/relation.go index 02a591b84..8abdbc591 100644 --- a/pkg/rpcli/relation.go +++ b/pkg/rpcli/relation.go @@ -1,11 +1,20 @@ package rpcli -import "github.com/openimsdk/protocol/relation" +import ( + "context" + "github.com/openimsdk/protocol/relation" + "google.golang.org/grpc" +) -func NewRelationClient(cli relation.FriendClient) *RelationClient { - return &RelationClient{cli} +func NewRelationClient(cc grpc.ClientConnInterface) *RelationClient { + return &RelationClient{relation.NewFriendClient(cc)} } type RelationClient struct { relation.FriendClient } + +func (x *RelationClient) GetFriendsInfo(ctx context.Context, ownerUserID string, friendUserIDs []string) ([]*relation.FriendInfoOnly, error) { + req := &relation.GetFriendInfoReq{OwnerUserID: ownerUserID, FriendUserIDs: friendUserIDs} + return extractField(ctx, x.FriendClient.GetFriendInfo, req, (*relation.GetFriendInfoResp).GetFriendInfos) +} diff --git a/pkg/rpcli/rtc.go b/pkg/rpcli/rtc.go index 1c9973619..18a79d6b4 100644 --- a/pkg/rpcli/rtc.go +++ b/pkg/rpcli/rtc.go @@ -2,10 +2,11 @@ package rpcli import ( "github.com/openimsdk/protocol/rtc" + "google.golang.org/grpc" ) -func NewRtcServiceClient(cli rtc.RtcServiceClient) *RtcServiceClient { - return &RtcServiceClient{cli} +func NewRtcServiceClient(cc grpc.ClientConnInterface) *RtcServiceClient { + return &RtcServiceClient{rtc.NewRtcServiceClient(cc)} } type RtcServiceClient struct { diff --git a/pkg/rpcli/third.go b/pkg/rpcli/third.go index 7ac0e84a7..cbb28ff34 100644 --- a/pkg/rpcli/third.go +++ b/pkg/rpcli/third.go @@ -1,9 +1,12 @@ package rpcli -import "github.com/openimsdk/protocol/third" +import ( + "github.com/openimsdk/protocol/third" + "google.golang.org/grpc" +) -func NewThirdClient(cli third.ThirdClient) *ThirdClient { - return &ThirdClient{cli} +func NewThirdClient(cc grpc.ClientConnInterface) *ThirdClient { + return &ThirdClient{third.NewThirdClient(cc)} } type ThirdClient struct { diff --git a/pkg/rpcli/user.go b/pkg/rpcli/user.go index c9a72e7e3..77d8c53c4 100644 --- a/pkg/rpcli/user.go +++ b/pkg/rpcli/user.go @@ -1,11 +1,73 @@ package rpcli -import "github.com/openimsdk/protocol/user" +import ( + "context" + "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/protocol/user" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/utils/datautil" + "google.golang.org/grpc" +) -func NewUserClient(cli user.UserClient) *UserClient { - return &UserClient{cli} +func NewUserClient(cc grpc.ClientConnInterface) *UserClient { + return &UserClient{user.NewUserClient(cc)} } type UserClient struct { user.UserClient } + +func (x *UserClient) GetUsersInfo(ctx context.Context, userIDs []string) ([]*sdkws.UserInfo, error) { + req := &user.GetDesignateUsersReq{UserIDs: userIDs} + return extractField(ctx, x.UserClient.GetDesignateUsers, req, (*user.GetDesignateUsersResp).GetUsersInfo) +} + +func (x *UserClient) GetUserInfo(ctx context.Context, userID string) (*sdkws.UserInfo, error) { + return firstValue(x.GetUsersInfo(ctx, []string{userID})) +} + +func (x *UserClient) CheckUser(ctx context.Context, userIDs []string) error { + users, err := x.GetUsersInfo(ctx, userIDs) + if err != nil { + return err + } + if len(users) != len(userIDs) { + return errs.ErrRecordNotFound.WrapMsg("user not found") + } + return nil +} + +func (x *UserClient) GetUsersInfoMap(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error) { + users, err := x.GetUsersInfo(ctx, userIDs) + if err != nil { + return nil, err + } + return datautil.SliceToMap(users, func(e *sdkws.UserInfo) string { + return e.UserID + }), nil +} + +func (x *UserClient) GetAllOnlineUsers(ctx context.Context, cursor uint64) (*user.GetAllOnlineUsersResp, error) { + req := &user.GetAllOnlineUsersReq{Cursor: cursor} + return x.UserClient.GetAllOnlineUsers(ctx, req) +} + +func (x *UserClient) GetUsersOnlinePlatform(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) { + if len(userIDs) == 0 { + return nil, nil + } + req := &user.GetUserStatusReq{UserIDs: userIDs} + return extractField(ctx, x.UserClient.GetUserStatus, req, (*user.GetUserStatusResp).GetStatusList) + +} + +func (x *UserClient) GetUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) { + status, err := x.GetUsersOnlinePlatform(ctx, []string{userID}) + if err != nil { + return nil, err + } + if len(status) == 0 { + return nil, nil + } + return status[0].PlatformIDs, nil +}