fix: discov update.

pull/2100/head
Gordon 2 years ago
parent 25114e403c
commit 1f6f8964dd

@ -18,6 +18,10 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/jsonutil"
"github.com/openimsdk/tools/utils/stringutil"
"sync" "sync"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
@ -36,10 +40,9 @@ import (
"github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msggateway" "github.com/openimsdk/protocol/msggateway"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/discoveryregistry" "github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -47,7 +50,7 @@ import (
type Pusher struct { type Pusher struct {
config *config.GlobalConfig config *config.GlobalConfig
database controller.PushDatabase database controller.PushDatabase
discov discoveryregistry.SvcDiscoveryRegistry discov discovery.SvcDiscoveryRegistry
offlinePusher offlinepush.OfflinePusher offlinePusher offlinepush.OfflinePusher
groupLocalCache *rpccache.GroupLocalCache groupLocalCache *rpccache.GroupLocalCache
conversationLocalCache *rpccache.ConversationLocalCache conversationLocalCache *rpccache.ConversationLocalCache
@ -58,7 +61,7 @@ type Pusher struct {
var errNoOfflinePusher = errors.New("no offlinePusher is configured") var errNoOfflinePusher = errors.New("no offlinePusher is configured")
func NewPusher(config *config.GlobalConfig, discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, func NewPusher(config *config.GlobalConfig, discov discovery.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase,
groupLocalCache *rpccache.GroupLocalCache, conversationLocalCache *rpccache.ConversationLocalCache, groupLocalCache *rpccache.GroupLocalCache, conversationLocalCache *rpccache.ConversationLocalCache,
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient,
) *Pusher { ) *Pusher {
@ -110,7 +113,7 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg
return err return err
} }
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) isOfflinePush := datautil.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
log.ZDebug(ctx, "push_result", "ws push result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush, "push_to_userID", userIDs) log.ZDebug(ctx, "push_result", "ws push result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush, "push_to_userID", userIDs)
if !isOfflinePush { if !isOfflinePush {
@ -120,10 +123,10 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg
if len(wsResults) == 0 { if len(wsResults) == 0 {
return nil return nil
} }
onlinePushSuccUserIDSet := utils.SliceSet(utils.Filter(wsResults, func(e *msggateway.SingleMsgToUserResults) (string, bool) { onlinePushSuccUserIDSet := datautil.SliceSet(datautil.Filter(wsResults, func(e *msggateway.SingleMsgToUserResults) (string, bool) {
return e.UserID, e.OnlinePush && e.UserID != "" return e.UserID, e.OnlinePush && e.UserID != ""
})) }))
offlinePushUserIDList := utils.Filter(wsResults, func(e *msggateway.SingleMsgToUserResults) (string, bool) { offlinePushUserIDList := datautil.Filter(wsResults, func(e *msggateway.SingleMsgToUserResults) (string, bool) {
_, exist := onlinePushSuccUserIDSet[e.UserID] _, exist := onlinePushSuccUserIDSet[e.UserID]
return e.UserID, !exist && e.UserID != "" && e.UserID != msg.SendID return e.UserID, !exist && e.UserID != "" && e.UserID != msg.SendID
}) })
@ -173,7 +176,7 @@ func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string,
if msg.ContentType != constant.SignalingNotification { if msg.ContentType != constant.SignalingNotification {
resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs( resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs(
ctx, ctx,
&conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: conversationutil.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs},
) )
if err != nil { if err != nil {
return err return err
@ -220,7 +223,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
if p.UnmarshalNotificationElem(msg.Content, &tips) != nil { if p.UnmarshalNotificationElem(msg.Content, &tips) != nil {
return err return err
} }
kickedUsers := utils.Slice(tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID }) kickedUsers := datautil.Slice(tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID })
defer func(groupID string, userIDs []string) { defer func(groupID string, userIDs []string) {
if err = p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil { if err = p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil {
log.ZError(ctx, "MemberKickedNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs) log.ZError(ctx, "MemberKickedNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs)
@ -256,7 +259,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
} }
log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg) log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg)
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) isOfflinePush := datautil.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
if isOfflinePush && p.config.Envs.Discovery == "k8s" { if isOfflinePush && p.config.Envs.Discovery == "k8s" {
return p.k8sOfflinePush2SuperGroup(ctx, groupID, msg, wsResults) return p.k8sOfflinePush2SuperGroup(ctx, groupID, msg, wsResults)
} }
@ -293,7 +296,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
} }
} }
needOfflinePushUserIDs := utils.DifferenceString(onlineSuccessUserIDs, pushToUserIDs) needOfflinePushUserIDs := stringutil.DifferenceString(onlineSuccessUserIDs, pushToUserIDs)
// Use offline push messaging // Use offline push messaging
if len(needOfflinePushUserIDs) > 0 { if len(needOfflinePushUserIDs) > 0 {
@ -309,7 +312,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
if msg.ContentType != constant.SignalingNotification { if msg.ContentType != constant.SignalingNotification {
resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs( resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs(
ctx, ctx,
&conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: conversationutil.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs},
) )
if err != nil { if err != nil {
return err return err
@ -320,8 +323,8 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
return err return err
} }
if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil { if _, err := p.GetConnsAndOnlinePush(ctx, msg, stringutil.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil {
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs)) log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", stringutil.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs))
return err return err
} }
} }
@ -503,8 +506,8 @@ func (p *Pusher) getOfflinePushInfos(conversationID string, msg *sdkws.MsgData)
title = constant.ContentType2PushContent[int64(msg.ContentType)] title = constant.ContentType2PushContent[int64(msg.ContentType)]
case constant.AtText: case constant.AtText:
ac := atContent{} ac := atContent{}
_ = utils.JsonStringToStruct(string(msg.Content), &ac) _ = jsonutil.JsonStringToStruct(string(msg.Content), &ac)
if utils.Contain(conversationID, ac.AtUserList...) { if datautil.Contain(conversationID, ac.AtUserList...) {
title = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common] title = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
} else { } else {
title = constant.ContentType2PushContent[constant.GroupMsg] title = constant.ContentType2PushContent[constant.GroupMsg]

Loading…
Cancel
Save