|
|
|
@ -18,6 +18,7 @@ import (
|
|
|
|
|
"context"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"errors"
|
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
@ -142,6 +143,47 @@ func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t any) error {
|
|
|
|
|
return json.Unmarshal([]byte(notification.Detail), t)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
k8s deployment,offline push group messages function
|
|
|
|
|
*/
|
|
|
|
|
func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData, wsResults []*msggateway.SingleMsgToUserResults) error {
|
|
|
|
|
|
|
|
|
|
var needOfflinePushUserIDs []string
|
|
|
|
|
for _, v := range wsResults {
|
|
|
|
|
if !v.OnlinePush {
|
|
|
|
|
needOfflinePushUserIDs = append(needOfflinePushUserIDs, v.UserID)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if len(needOfflinePushUserIDs) > 0 {
|
|
|
|
|
var offlinePushUserIDs []string
|
|
|
|
|
err := callbackOfflinePush(ctx, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(offlinePushUserIDs) > 0 {
|
|
|
|
|
needOfflinePushUserIDs = offlinePushUserIDs
|
|
|
|
|
}
|
|
|
|
|
if msg.ContentType != constant.SignalingNotification {
|
|
|
|
|
resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs(
|
|
|
|
|
ctx,
|
|
|
|
|
&conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs},
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if len(resp.UserIDs) > 0 {
|
|
|
|
|
err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
|
|
|
|
|
log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
|
|
|
|
|
var pushToUserIDs []string
|
|
|
|
@ -205,7 +247,10 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
|
|
|
|
|
|
|
|
|
|
log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg)
|
|
|
|
|
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
|
|
|
|
|
if isOfflinePush {
|
|
|
|
|
if isOfflinePush && config.Config.Envs.Discovery == "k8s" {
|
|
|
|
|
return p.k8sOfflinePush2SuperGroup(ctx, groupID, msg, wsResults)
|
|
|
|
|
}
|
|
|
|
|
if isOfflinePush && config.Config.Envs.Discovery == "zookeeper" {
|
|
|
|
|
var (
|
|
|
|
|
onlineSuccessUserIDs = []string{msg.SendID}
|
|
|
|
|
webAndPcBackgroundUserIDs []string
|
|
|
|
@ -239,14 +284,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
needOfflinePushUserIDs := utils.DifferenceString(onlineSuccessUserIDs, pushToUserIDs)
|
|
|
|
|
if msg.ContentType != constant.SignalingNotification {
|
|
|
|
|
notNotificationUserIDs, err := p.conversationLocalCache.GetRecvMsgNotNotifyUserIDs(ctx, groupID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
needOfflinePushUserIDs = utils.SliceSub(needOfflinePushUserIDs, notNotificationUserIDs)
|
|
|
|
|
}
|
|
|
|
|
// Use offline push messaging
|
|
|
|
|
if len(needOfflinePushUserIDs) > 0 {
|
|
|
|
|
var offlinePushUserIDs []string
|
|
|
|
@ -258,30 +296,89 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
|
|
|
|
|
if len(offlinePushUserIDs) > 0 {
|
|
|
|
|
needOfflinePushUserIDs = offlinePushUserIDs
|
|
|
|
|
}
|
|
|
|
|
resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs(
|
|
|
|
|
ctx,
|
|
|
|
|
&conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs},
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if len(resp.UserIDs) > 0 {
|
|
|
|
|
err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs)
|
|
|
|
|
if msg.ContentType != constant.SignalingNotification {
|
|
|
|
|
resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs(
|
|
|
|
|
ctx,
|
|
|
|
|
&conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs},
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil {
|
|
|
|
|
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs))
|
|
|
|
|
return err
|
|
|
|
|
if len(resp.UserIDs) > 0 {
|
|
|
|
|
err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil {
|
|
|
|
|
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs))
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Pusher) k8sOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
|
|
|
|
|
var usersHost = make(map[string][]string)
|
|
|
|
|
for _, v := range pushToUserIDs {
|
|
|
|
|
tHost, err := p.discov.GetUserIdHashGatewayHost(ctx, v)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ZError(ctx, "get msggateway hash error", err)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
tUsers, tbl := usersHost[tHost]
|
|
|
|
|
if tbl {
|
|
|
|
|
tUsers = append(tUsers, v)
|
|
|
|
|
usersHost[tHost] = tUsers
|
|
|
|
|
} else {
|
|
|
|
|
usersHost[tHost] = []string{v}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
log.ZDebug(ctx, "genUsers send hosts struct:", "usersHost", usersHost)
|
|
|
|
|
var usersConns = make(map[*grpc.ClientConn][]string)
|
|
|
|
|
for host, userIds := range usersHost {
|
|
|
|
|
tconn, _ := p.discov.GetConn(ctx, host)
|
|
|
|
|
usersConns[tconn] = userIds
|
|
|
|
|
}
|
|
|
|
|
var (
|
|
|
|
|
mu sync.Mutex
|
|
|
|
|
wg = errgroup.Group{}
|
|
|
|
|
maxWorkers = config.Config.Push.MaxConcurrentWorkers
|
|
|
|
|
)
|
|
|
|
|
if maxWorkers < 3 {
|
|
|
|
|
maxWorkers = 3
|
|
|
|
|
}
|
|
|
|
|
wg.SetLimit(maxWorkers)
|
|
|
|
|
for conn, userIds := range usersConns {
|
|
|
|
|
tcon := conn
|
|
|
|
|
tuserIds := userIds
|
|
|
|
|
wg.Go(func() error {
|
|
|
|
|
input := &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: tuserIds}
|
|
|
|
|
msgClient := msggateway.NewMsgGatewayClient(tcon)
|
|
|
|
|
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
log.ZDebug(ctx, "push result", "reply", reply)
|
|
|
|
|
if reply != nil && reply.SinglePushResult != nil {
|
|
|
|
|
mu.Lock()
|
|
|
|
|
wsResults = append(wsResults, reply.SinglePushResult...)
|
|
|
|
|
mu.Unlock()
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
_ = wg.Wait()
|
|
|
|
|
return wsResults, nil
|
|
|
|
|
}
|
|
|
|
|
func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
|
|
|
|
|
if config.Config.Envs.Discovery == "k8s" {
|
|
|
|
|
return p.k8sOnlinePush(ctx, msg, pushToUserIDs)
|
|
|
|
|
}
|
|
|
|
|
conns, err := p.discov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName)
|
|
|
|
|
log.ZDebug(ctx, "get gateway conn", "conn length", len(conns))
|
|
|
|
|
if err != nil {
|
|
|
|
|