diff --git a/go.mod b/go.mod index 11d374d08..e255f8ddf 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.19 require ( firebase.google.com/go v3.13.0+incompatible github.com/OpenIMSDK/protocol v0.0.31 - github.com/OpenIMSDK/tools v0.0.20 + github.com/OpenIMSDK/tools v0.0.21 github.com/bwmarrin/snowflake v0.3.0 // indirect github.com/dtm-labs/rockscache v0.1.1 github.com/gin-gonic/gin v1.9.1 @@ -38,6 +38,7 @@ require ( github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible github.com/go-redis/redis v6.15.9+incompatible github.com/redis/go-redis/v9 v9.2.1 + github.com/stathat/consistent v1.0.0 github.com/tencentyun/cos-go-sdk-v5 v0.7.45 go.uber.org/automaxprocs v1.5.3 golang.org/x/sync v0.4.0 @@ -141,6 +142,7 @@ require ( gopkg.in/src-d/go-billy.v4 v4.3.2 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect gorm.io/gorm v1.23.8 // indirect + stathat.com/c/consistent v1.0.0 // indirect ) require ( diff --git a/go.sum b/go.sum index 30e4b3cb4..2d7edd845 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,8 @@ github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= github.com/OpenIMSDK/protocol v0.0.31 h1:ax43x9aqA6EKNXNukS5MT5BSTqkUmwO4uTvbJLtzCgE= github.com/OpenIMSDK/protocol v0.0.31/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= -github.com/OpenIMSDK/tools v0.0.20 h1:zBTjQZRJ5lR1FIzP9mtWyAvh5dKsmJXQugi4p8X/97k= -github.com/OpenIMSDK/tools v0.0.20/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= +github.com/OpenIMSDK/tools v0.0.21 h1:iTapc2mIEVH/xl5Nd6jfwPub11Pgp44tVcE1rjB3a48= +github.com/OpenIMSDK/tools v0.0.21/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs= @@ -308,6 +308,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/src-d/gcfg v1.4.0 h1:xXbNR5AlLSA315x2UO+fTSSAXCDf+Ar38/6oyGbDKQ4= github.com/src-d/gcfg v1.4.0/go.mod h1:p/UMsR43ujA89BJY9duynAwIpvqEujIH/jFlfL7jWoI= +github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U= +github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -536,3 +538,5 @@ gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= +stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c= +stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0= diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 737d5db14..70c2f8fe0 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -288,12 +288,13 @@ func (ws *WsServer) registerClient(client *Client) { } wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - _ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client) - }() - + if config.Config.Envs.Discovery == "zookeeper" { + wg.Add(1) + go func() { + defer wg.Done() + _ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client) + }() + } wg.Add(1) go func() { defer wg.Done() diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 2ee8c457f..b3d48aef6 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "google.golang.org/grpc" "sync" "golang.org/x/sync/errgroup" @@ -141,7 +142,51 @@ func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t any) error { return json.Unmarshal([]byte(notification.Detail), t) } +func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData, wsResults []*msggateway.SingleMsgToUserResults) error { + var needOfflinePushUserIDs []string + for _, v := range wsResults { + if !v.OnlinePush { + needOfflinePushUserIDs = append(needOfflinePushUserIDs, v.UserID) + } + } + if len(needOfflinePushUserIDs) > 0 { + if msg.ContentType != constant.SignalingNotification { + notNotificationUserIDs, err := p.conversationLocalCache.GetRecvMsgNotNotifyUserIDs(ctx, groupID) + if err != nil { + return err + } + + needOfflinePushUserIDs = utils.SliceSub(needOfflinePushUserIDs, notNotificationUserIDs) + } + } + if len(needOfflinePushUserIDs) > 0 { + var offlinePushUserIDs []string + err := callbackOfflinePush(ctx, needOfflinePushUserIDs, msg, &offlinePushUserIDs) + if err != nil { + return err + } + + if len(offlinePushUserIDs) > 0 { + needOfflinePushUserIDs = offlinePushUserIDs + } + resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs( + ctx, + &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, + ) + if err != nil { + return err + } + if len(resp.UserIDs) > 0 { + err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs) + if err != nil { + log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) + return err + } + } + } + return nil +} func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) var pushToUserIDs []string @@ -205,7 +250,10 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg) isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) - if isOfflinePush { + if isOfflinePush && config.Config.Envs.Discovery == "k8s" { + return p.k8sOfflinePush2SuperGroup(ctx, groupID, msg, wsResults) + } + if isOfflinePush && config.Config.Envs.Discovery == "zookeeper" { var ( onlineSuccessUserIDs = []string{msg.SendID} webAndPcBackgroundUserIDs []string @@ -281,7 +329,61 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws return nil } +func (p *Pusher) k8sOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { + var usersHost = make(map[string][]string) + for _, v := range pushToUserIDs { + tHost, err := p.discov.GetUserIdHashGatewayHost(ctx, v) + if err != nil { + log.ZError(ctx, "get msggateway hash error", err) + return nil, err + } + tUsers, tbl := usersHost[tHost] + if tbl { + tUsers = append(tUsers, v) + usersHost[tHost] = tUsers + } else { + usersHost[tHost] = []string{v} + } + } + log.ZDebug(ctx, "genUsers send hosts struct:", usersHost) + var usersConns = make(map[*grpc.ClientConn][]string) + for host, userIds := range usersHost { + tconn, _ := p.discov.GetConn(ctx, host) + usersConns[tconn] = userIds + } + var ( + mu sync.Mutex + wg = errgroup.Group{} + maxWorkers = config.Config.Push.MaxConcurrentWorkers + ) + if maxWorkers < 3 { + maxWorkers = 3 + } + wg.SetLimit(maxWorkers) + for conn, userIds := range usersConns { + wg.Go(func() error { + input := &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: userIds} + msgClient := msggateway.NewMsgGatewayClient(conn) + reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input) + if err != nil { + return nil + } + log.ZDebug(ctx, "push result", "reply", reply) + if reply != nil && reply.SinglePushResult != nil { + mu.Lock() + wsResults = append(wsResults, reply.SinglePushResult...) + mu.Unlock() + } + return nil + }) + } + _ = wg.Wait() + return wsResults, nil +} func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { + if config.Config.Envs.Discovery == "k8s" { + return p.k8sOnlinePush(ctx, msg, pushToUserIDs) + } conns, err := p.discov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName) log.ZDebug(ctx, "get gateway conn", "conn length", len(conns)) if err != nil { diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 62c1f4a31..5671fe4f2 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/stathat/consistent" "os" "strconv" "strings" @@ -37,19 +38,25 @@ func NewDiscoveryRegister(envType string) (discoveryregistry.SvcDiscoveryRegistr } type K8sDR struct { - options []grpc.DialOption - rpcRegisterAddr string + options []grpc.DialOption + rpcRegisterAddr string + gatewayHostConsistent *consistent.Consistent } func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) { - return &K8sDR{}, nil + gatewayConsistent := consistent.New() + gatewayHosts := getMsgGatewayHost(context.Background()) + for _, v := range gatewayHosts { + gatewayConsistent.Add(v) + } + return &K8sDR{gatewayHostConsistent: gatewayConsistent}, nil } func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName { cli.rpcRegisterAddr = serviceName } else { - cli.rpcRegisterAddr = cli.getSelfHost(context.Background()) + cli.rpcRegisterAddr = getSelfHost(context.Background()) } return nil @@ -71,7 +78,14 @@ func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) { return nil, nil } -func (cli *K8sDR) getSelfHost(ctx context.Context) string { +func (cli *K8sDR) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) { + host, err := cli.gatewayHostConsistent.Get(userId) + if err != nil { + log.ZError(ctx, "GetUserIdHashGatewayHost error", err) + } + return host, err +} +func getSelfHost(ctx context.Context) string { port := 88 instance := "openimserver" selfPodName := os.Getenv("MY_POD_NAME") @@ -92,7 +106,7 @@ func (cli *K8sDR) getSelfHost(ctx context.Context) string { } // like openimserver-openim-msggateway-0.openimserver-openim-msggateway-headless.openim-lin.svc.cluster.local:88 -func (cli *K8sDR) getMsgGatewayHost(ctx context.Context) []string { +func getMsgGatewayHost(ctx context.Context) []string { port := 88 instance := "openimserver" selfPodName := os.Getenv("MY_POD_NAME") @@ -122,7 +136,7 @@ func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc return []*grpc.ClientConn{conn}, err } else { var ret []*grpc.ClientConn - gatewayHosts := cli.getMsgGatewayHost(ctx) + gatewayHosts := getMsgGatewayHost(ctx) for _, host := range gatewayHosts { conn, err := grpc.DialContext(ctx, host, append(cli.options, opts...)...) if err != nil { @@ -138,6 +152,7 @@ func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc. return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) } + func (cli *K8sDR) GetSelfConnTarget() string { return cli.rpcRegisterAddr