diff --git a/deployments/templates/prometheus.yml b/deployments/templates/prometheus.yml index 5c3e0af66..f1a551a62 100644 --- a/deployments/templates/prometheus.yml +++ b/deployments/templates/prometheus.yml @@ -44,12 +44,12 @@ scrape_configs: # prometheus fetches application services - job_name: 'openimserver-openim-api' static_configs: - - targets: [ '${OPENIM_SERVER_ADDRESS}:${API_PROM_PORT}' ] + - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${API_PROM_PORT}' ] labels: namespace: 'default' - job_name: 'openimserver-openim-msggateway' static_configs: - - targets: [ '${OPENIM_SERVER_ADDRESS}:${MSG_GATEWAY_PROM_PORT}' ] + - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${MSG_GATEWAY_PROM_PORT}' ] labels: namespace: 'default' - job_name: 'openimserver-openim-msgtransfer' @@ -59,41 +59,41 @@ scrape_configs: namespace: 'default' - job_name: 'openimserver-openim-push' static_configs: - - targets: [ '${OPENIM_SERVER_ADDRESS}:${PUSH_PROM_PORT}' ] + - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${PUSH_PROM_PORT}' ] labels: namespace: 'default' - job_name: 'openimserver-openim-rpc-auth' static_configs: - - targets: [ '${OPENIM_SERVER_ADDRESS}:${AUTH_PROM_PORT}' ] + - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${AUTH_PROM_PORT}' ] labels: namespace: 'default' - job_name: 'openimserver-openim-rpc-conversation' static_configs: - - targets: [ '${OPENIM_SERVER_ADDRESS}:${CONVERSATION_PROM_PORT}' ] + - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${CONVERSATION_PROM_PORT}' ] labels: namespace: 'default' - job_name: 'openimserver-openim-rpc-friend' static_configs: - - targets: [ '${OPENIM_SERVER_ADDRESS}:${FRIEND_PROM_PORT}' ] + - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${FRIEND_PROM_PORT}' ] labels: namespace: 'default' - job_name: 'openimserver-openim-rpc-group' static_configs: - - targets: [ '${OPENIM_SERVER_ADDRESS}:${GROUP_PROM_PORT}' ] + - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${GROUP_PROM_PORT}' ] labels: namespace: 'default' - job_name: 'openimserver-openim-rpc-msg' static_configs: - - targets: [ '${OPENIM_SERVER_ADDRESS}:${MESSAGE_PROM_PORT}' ] + - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${MESSAGE_PROM_PORT}' ] labels: namespace: 'default' - job_name: 'openimserver-openim-rpc-third' static_configs: - - targets: [ '${OPENIM_SERVER_ADDRESS}:${THIRD_PROM_PORT}' ] + - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${THIRD_PROM_PORT}' ] labels: namespace: 'default' - job_name: 'openimserver-openim-rpc-user' static_configs: - - targets: [ '${OPENIM_SERVER_ADDRESS}:${USER_PROM_PORT}' ] + - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${USER_PROM_PORT}' ] labels: namespace: 'default' diff --git a/go.mod b/go.mod index 0d614c614..4e31c8cb0 100644 --- a/go.mod +++ b/go.mod @@ -34,10 +34,11 @@ require github.com/google/uuid v1.3.1 require ( github.com/IBM/sarama v1.41.3 github.com/OpenIMSDK/protocol v0.0.37 - github.com/OpenIMSDK/tools v0.0.20 + github.com/OpenIMSDK/tools v0.0.21 github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible github.com/go-redis/redis v6.15.9+incompatible github.com/redis/go-redis/v9 v9.2.1 + github.com/stathat/consistent v1.0.0 github.com/tencentyun/cos-go-sdk-v5 v0.7.45 go.uber.org/automaxprocs v1.5.3 golang.org/x/sync v0.4.0 @@ -141,6 +142,7 @@ require ( gopkg.in/src-d/go-billy.v4 v4.3.2 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect gorm.io/gorm v1.23.8 // indirect + stathat.com/c/consistent v1.0.0 // indirect ) require ( diff --git a/go.sum b/go.sum index 322f59150..cdd9ead1c 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,8 @@ github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= github.com/OpenIMSDK/protocol v0.0.37 h1:sVTnb3TjMQ3PG4m3gDI/LAM1Hkk9nI6gf7lf7UpscnI= github.com/OpenIMSDK/protocol v0.0.37/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= -github.com/OpenIMSDK/tools v0.0.20 h1:zBTjQZRJ5lR1FIzP9mtWyAvh5dKsmJXQugi4p8X/97k= -github.com/OpenIMSDK/tools v0.0.20/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= +github.com/OpenIMSDK/tools v0.0.21 h1:iTapc2mIEVH/xl5Nd6jfwPub11Pgp44tVcE1rjB3a48= +github.com/OpenIMSDK/tools v0.0.21/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs= @@ -308,6 +308,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/src-d/gcfg v1.4.0 h1:xXbNR5AlLSA315x2UO+fTSSAXCDf+Ar38/6oyGbDKQ4= github.com/src-d/gcfg v1.4.0/go.mod h1:p/UMsR43ujA89BJY9duynAwIpvqEujIH/jFlfL7jWoI= +github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U= +github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -536,3 +538,5 @@ gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= +stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c= +stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0= diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 737d5db14..70c2f8fe0 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -288,12 +288,13 @@ func (ws *WsServer) registerClient(client *Client) { } wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - _ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client) - }() - + if config.Config.Envs.Discovery == "zookeeper" { + wg.Add(1) + go func() { + defer wg.Done() + _ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client) + }() + } wg.Add(1) go func() { defer wg.Done() diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index a1a9ff08e..c91206ecc 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -67,13 +67,14 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { case constant.SuperGroupChatType: err = c.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData) default: - var pushUserIDs []string - if pbData.MsgData.SendID != pbData.MsgData.RecvID { - pushUserIDs = []string{pbData.MsgData.SendID, pbData.MsgData.RecvID} + var pushUserIDList []string + isSenderSync := utils.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync) + if !isSenderSync || pbData.MsgData.SendID == pbData.MsgData.RecvID { + pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID) } else { - pushUserIDs = []string{pbData.MsgData.SendID} + pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID, pbData.MsgData.SendID) } - err = c.pusher.Push2User(ctx, pushUserIDs, pbData.MsgData) + err = c.pusher.Push2User(ctx, pushUserIDList, pbData.MsgData) } if err != nil { if err == errNoOfflinePusher { diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index 188ddc0e1..9e66f8f73 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -16,9 +16,8 @@ package push import ( "context" - "sync" - "github.com/OpenIMSDK/tools/utils" + "sync" "google.golang.org/grpc" diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 2ee8c457f..12b78ea2d 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "google.golang.org/grpc" "sync" "golang.org/x/sync/errgroup" @@ -142,6 +143,47 @@ func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t any) error { return json.Unmarshal([]byte(notification.Detail), t) } +/* +k8s deployment,offline push group messages function +*/ +func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData, wsResults []*msggateway.SingleMsgToUserResults) error { + + var needOfflinePushUserIDs []string + for _, v := range wsResults { + if !v.OnlinePush { + needOfflinePushUserIDs = append(needOfflinePushUserIDs, v.UserID) + } + } + if len(needOfflinePushUserIDs) > 0 { + var offlinePushUserIDs []string + err := callbackOfflinePush(ctx, needOfflinePushUserIDs, msg, &offlinePushUserIDs) + if err != nil { + return err + } + + if len(offlinePushUserIDs) > 0 { + needOfflinePushUserIDs = offlinePushUserIDs + } + if msg.ContentType != constant.SignalingNotification { + resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs( + ctx, + &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, + ) + if err != nil { + return err + } + if len(resp.UserIDs) > 0 { + err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs) + if err != nil { + log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) + return err + } + } + } + + } + return nil +} func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) var pushToUserIDs []string @@ -205,7 +247,10 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg) isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) - if isOfflinePush { + if isOfflinePush && config.Config.Envs.Discovery == "k8s" { + return p.k8sOfflinePush2SuperGroup(ctx, groupID, msg, wsResults) + } + if isOfflinePush && config.Config.Envs.Discovery == "zookeeper" { var ( onlineSuccessUserIDs = []string{msg.SendID} webAndPcBackgroundUserIDs []string @@ -239,14 +284,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws } needOfflinePushUserIDs := utils.DifferenceString(onlineSuccessUserIDs, pushToUserIDs) - if msg.ContentType != constant.SignalingNotification { - notNotificationUserIDs, err := p.conversationLocalCache.GetRecvMsgNotNotifyUserIDs(ctx, groupID) - if err != nil { - return err - } - needOfflinePushUserIDs = utils.SliceSub(needOfflinePushUserIDs, notNotificationUserIDs) - } // Use offline push messaging if len(needOfflinePushUserIDs) > 0 { var offlinePushUserIDs []string @@ -258,30 +296,89 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws if len(offlinePushUserIDs) > 0 { needOfflinePushUserIDs = offlinePushUserIDs } - resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs( - ctx, - &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, - ) - if err != nil { - return err - } - if len(resp.UserIDs) > 0 { - err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs) + if msg.ContentType != constant.SignalingNotification { + resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs( + ctx, + &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, + ) if err != nil { - log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) return err } - if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil { - log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs)) - return err + if len(resp.UserIDs) > 0 { + err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs) + if err != nil { + log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) + return err + } + if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil { + log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs)) + return err + } } } + } } return nil } +func (p *Pusher) k8sOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { + var usersHost = make(map[string][]string) + for _, v := range pushToUserIDs { + tHost, err := p.discov.GetUserIdHashGatewayHost(ctx, v) + if err != nil { + log.ZError(ctx, "get msggateway hash error", err) + return nil, err + } + tUsers, tbl := usersHost[tHost] + if tbl { + tUsers = append(tUsers, v) + usersHost[tHost] = tUsers + } else { + usersHost[tHost] = []string{v} + } + } + log.ZDebug(ctx, "genUsers send hosts struct:", "usersHost", usersHost) + var usersConns = make(map[*grpc.ClientConn][]string) + for host, userIds := range usersHost { + tconn, _ := p.discov.GetConn(ctx, host) + usersConns[tconn] = userIds + } + var ( + mu sync.Mutex + wg = errgroup.Group{} + maxWorkers = config.Config.Push.MaxConcurrentWorkers + ) + if maxWorkers < 3 { + maxWorkers = 3 + } + wg.SetLimit(maxWorkers) + for conn, userIds := range usersConns { + tcon := conn + tuserIds := userIds + wg.Go(func() error { + input := &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: tuserIds} + msgClient := msggateway.NewMsgGatewayClient(tcon) + reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input) + if err != nil { + return nil + } + log.ZDebug(ctx, "push result", "reply", reply) + if reply != nil && reply.SinglePushResult != nil { + mu.Lock() + wsResults = append(wsResults, reply.SinglePushResult...) + mu.Unlock() + } + return nil + }) + } + _ = wg.Wait() + return wsResults, nil +} func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { + if config.Config.Envs.Discovery == "k8s" { + return p.k8sOnlinePush(ctx, msg, pushToUserIDs) + } conns, err := p.discov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName) log.ZDebug(ctx, "get gateway conn", "conn length", len(conns)) if err != nil { diff --git a/pkg/common/discoveryregister/kubernetes/kubernetes.go b/pkg/common/discoveryregister/kubernetes/kubernetes.go index cd5fb0a36..2ff1539e6 100644 --- a/pkg/common/discoveryregister/kubernetes/kubernetes.go +++ b/pkg/common/discoveryregister/kubernetes/kubernetes.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "github.com/stathat/consistent" "os" "strconv" "strings" @@ -31,51 +32,54 @@ import ( // K8sDR represents the Kubernetes service discovery and registration client. type K8sDR struct { - options []grpc.DialOption - rpcRegisterAddr string + options []grpc.DialOption + rpcRegisterAddr string + gatewayHostConsistent *consistent.Consistent } -// NewK8sDiscoveryRegister creates a new instance of K8sDR for Kubernetes service discovery and registration. func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) { - - return &K8sDR{}, nil + gatewayConsistent := consistent.New() + gatewayHosts := getMsgGatewayHost(context.Background()) + for _, v := range gatewayHosts { + gatewayConsistent.Add(v) + } + return &K8sDR{gatewayHostConsistent: gatewayConsistent}, nil } -// Register registers a service with Kubernetes. func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName { cli.rpcRegisterAddr = serviceName } else { - cli.rpcRegisterAddr = cli.getSelfHost(context.Background()) + cli.rpcRegisterAddr = getSelfHost(context.Background()) } return nil } - -// UnRegister removes a service registration from Kubernetes. func (cli *K8sDR) UnRegister() error { return nil } - -// CreateRpcRootNodes creates root nodes for RPC in Kubernetes. func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error { return nil } - -// RegisterConf2Registry registers a configuration to the registry. func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error { return nil } -// GetConfFromRegistry retrieves a configuration from the registry. func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) { + return nil, nil } - -func (cli *K8sDR) getSelfHost(ctx context.Context) string { +func (cli *K8sDR) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) { + host, err := cli.gatewayHostConsistent.Get(userId) + if err != nil { + log.ZError(ctx, "GetUserIdHashGatewayHost error", err) + } + return host, err +} +func getSelfHost(ctx context.Context) string { port := 88 instance := "openimserver" selfPodName := os.Getenv("MY_POD_NAME") @@ -95,26 +99,8 @@ func (cli *K8sDR) getSelfHost(ctx context.Context) string { return host } -// GetConns returns a list of gRPC client connections for a given service. -func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { - if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName { - conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) - return []*grpc.ClientConn{conn}, err - } - var ret []*grpc.ClientConn - gatewayHosts := cli.getMsgGatewayHost(ctx) - for _, host := range gatewayHosts { - conn, err := grpc.DialContext(ctx, host, append(cli.options, opts...)...) - if err != nil { - return nil, err - } - ret = append(ret, conn) - } - return ret, nil -} - // like openimserver-openim-msggateway-0.openimserver-openim-msggateway-headless.openim-lin.svc.cluster.local:88 -func (cli *K8sDR) getMsgGatewayHost(ctx context.Context) []string { +func getMsgGatewayHost(ctx context.Context) []string { port := 88 instance := "openimserver" selfPodName := os.Getenv("MY_POD_NAME") @@ -135,40 +121,48 @@ func (cli *K8sDR) getMsgGatewayHost(ctx context.Context) []string { ret = append(ret, host) } log.ZInfo(ctx, "getMsgGatewayHost", "instance", instance, "selfPodName", selfPodName, "replicas", replicas, "ns", ns, "ret", ret) - return ret } +func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { -// GetConn returns a single gRPC client connection for a given service. + if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName { + conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) + return []*grpc.ClientConn{conn}, err + } else { + var ret []*grpc.ClientConn + gatewayHosts := getMsgGatewayHost(ctx) + for _, host := range gatewayHosts { + conn, err := grpc.DialContext(ctx, host, append(cli.options, opts...)...) + if err != nil { + return nil, err + } else { + ret = append(ret, conn) + } + } + return ret, nil + } +} func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) } -// GetSelfConnTarget returns the connection target of the client itself. func (cli *K8sDR) GetSelfConnTarget() string { + return cli.rpcRegisterAddr } - -// AddOption adds gRPC dial options to the client. func (cli *K8sDR) AddOption(opts ...grpc.DialOption) { cli.options = append(cli.options, opts...) } - -// CloseConn closes a given gRPC client connection. func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) { conn.Close() } -// do not use this method for call rpc. +// do not use this method for call rpc func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn { fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!") - return nil } - -// Close closes the K8sDR client. func (cli *K8sDR) Close() { - - // Close any open resources here (if applicable) return } diff --git a/scripts/check-all.sh b/scripts/check-all.sh index a438ac26b..efaa2ccb7 100755 --- a/scripts/check-all.sh +++ b/scripts/check-all.sh @@ -33,15 +33,15 @@ openim::log::info "\n# Begin to check all openim service" # OpenIM status # Elegant printing function print_services_and_ports() { - local -n service_names=$1 - local -n service_ports=$2 + service_names=("$1[@]") + service_ports=("$2[@]") echo "+-------------------------+----------+" echo "| Service Name | Port |" echo "+-------------------------+----------+" - for index in "${!service_names[@]}"; do - printf "| %-23s | %-8s |\n" "${service_names[$index]}" "${service_ports[$index]}" + for index in "${!service_names}"; do + printf "| %-23s | %-8s |\n" "${!service_names[$index]}" "${!service_ports[$index]}" done echo "+-------------------------+----------+"