From 1f6f8964dd0f5f748d0ba8749f7b89b1ba7bc75b Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Tue, 26 Mar 2024 19:51:43 +0800 Subject: [PATCH] fix: discov update. --- internal/push/push_to_client.go | 35 ++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index d96eb0d81..49b4cd501 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -18,6 +18,10 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil" + "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/jsonutil" + "github.com/openimsdk/tools/utils/stringutil" "sync" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" @@ -36,10 +40,9 @@ import ( "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msggateway" "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/tools/discoveryregistry" + "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" - "github.com/openimsdk/tools/utils" "golang.org/x/sync/errgroup" "google.golang.org/grpc" ) @@ -47,7 +50,7 @@ import ( type Pusher struct { config *config.GlobalConfig database controller.PushDatabase - discov discoveryregistry.SvcDiscoveryRegistry + discov discovery.SvcDiscoveryRegistry offlinePusher offlinepush.OfflinePusher groupLocalCache *rpccache.GroupLocalCache conversationLocalCache *rpccache.ConversationLocalCache @@ -58,7 +61,7 @@ type Pusher struct { var errNoOfflinePusher = errors.New("no offlinePusher is configured") -func NewPusher(config *config.GlobalConfig, discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, +func NewPusher(config *config.GlobalConfig, discov discovery.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, groupLocalCache *rpccache.GroupLocalCache, conversationLocalCache *rpccache.ConversationLocalCache, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient, ) *Pusher { @@ -110,7 +113,7 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg return err } - isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) + isOfflinePush := datautil.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) log.ZDebug(ctx, "push_result", "ws push result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush, "push_to_userID", userIDs) if !isOfflinePush { @@ -120,10 +123,10 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg if len(wsResults) == 0 { return nil } - onlinePushSuccUserIDSet := utils.SliceSet(utils.Filter(wsResults, func(e *msggateway.SingleMsgToUserResults) (string, bool) { + onlinePushSuccUserIDSet := datautil.SliceSet(datautil.Filter(wsResults, func(e *msggateway.SingleMsgToUserResults) (string, bool) { return e.UserID, e.OnlinePush && e.UserID != "" })) - offlinePushUserIDList := utils.Filter(wsResults, func(e *msggateway.SingleMsgToUserResults) (string, bool) { + offlinePushUserIDList := datautil.Filter(wsResults, func(e *msggateway.SingleMsgToUserResults) (string, bool) { _, exist := onlinePushSuccUserIDSet[e.UserID] return e.UserID, !exist && e.UserID != "" && e.UserID != msg.SendID }) @@ -173,7 +176,7 @@ func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string, if msg.ContentType != constant.SignalingNotification { resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs( ctx, - &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, + &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: conversationutil.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, ) if err != nil { return err @@ -220,7 +223,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws if p.UnmarshalNotificationElem(msg.Content, &tips) != nil { return err } - kickedUsers := utils.Slice(tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID }) + kickedUsers := datautil.Slice(tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID }) defer func(groupID string, userIDs []string) { if err = p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil { log.ZError(ctx, "MemberKickedNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs) @@ -256,7 +259,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws } log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg) - isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) + isOfflinePush := datautil.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) if isOfflinePush && p.config.Envs.Discovery == "k8s" { return p.k8sOfflinePush2SuperGroup(ctx, groupID, msg, wsResults) } @@ -293,7 +296,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws } } - needOfflinePushUserIDs := utils.DifferenceString(onlineSuccessUserIDs, pushToUserIDs) + needOfflinePushUserIDs := stringutil.DifferenceString(onlineSuccessUserIDs, pushToUserIDs) // Use offline push messaging if len(needOfflinePushUserIDs) > 0 { @@ -309,7 +312,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws if msg.ContentType != constant.SignalingNotification { resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs( ctx, - &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, + &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: conversationutil.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, ) if err != nil { return err @@ -320,8 +323,8 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) return err } - if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil { - log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs)) + if _, err := p.GetConnsAndOnlinePush(ctx, msg, stringutil.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil { + log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", stringutil.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs)) return err } } @@ -503,8 +506,8 @@ func (p *Pusher) getOfflinePushInfos(conversationID string, msg *sdkws.MsgData) title = constant.ContentType2PushContent[int64(msg.ContentType)] case constant.AtText: ac := atContent{} - _ = utils.JsonStringToStruct(string(msg.Content), &ac) - if utils.Contain(conversationID, ac.AtUserList...) { + _ = jsonutil.JsonStringToStruct(string(msg.Content), &ac) + if datautil.Contain(conversationID, ac.AtUserList...) { title = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common] } else { title = constant.ContentType2PushContent[constant.GroupMsg]