From 3dd019efb0c031b4563cd19cc421d6ab449bdb3f Mon Sep 17 00:00:00 2001 From: Xinwei Xiong <3293172751@qq.com> Date: Mon, 18 Dec 2023 10:06:13 +0800 Subject: [PATCH] Update kubernetes.go --- .../kubernetes/kubernetes.go | 68 ++++++++++++++++++- 1 file changed, 65 insertions(+), 3 deletions(-) diff --git a/pkg/common/discoveryregister/kubernetes/kubernetes.go b/pkg/common/discoveryregister/kubernetes/kubernetes.go index dbd47b7df..6508ea05d 100644 --- a/pkg/common/discoveryregister/kubernetes/kubernetes.go +++ b/pkg/common/discoveryregister/kubernetes/kubernetes.go @@ -37,7 +37,11 @@ func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) { // Register registers a service with Kubernetes. func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { - cli.rpcRegisterAddr = serviceName + if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName { + cli.rpcRegisterAddr = serviceName + } else { + cli.rpcRegisterAddr = cli.getSelfHost(context.Background()) + } return nil } @@ -65,10 +69,68 @@ func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) { return nil, nil } +func (cli *K8sDR) getSelfHost(ctx context.Context) string { + port := 88 + instance := "openimserver" + selfPodName := os.Getenv("MY_POD_NAME") + ns := os.Getenv("MY_POD_NAMESPACE") + statefuleIndex := 0 + gatewayEnds := strings.Split(config.Config.RpcRegisterName.OpenImMessageGatewayName, ":") + if len(gatewayEnds) != 2 { + log.ZError(ctx, "msggateway RpcRegisterName is error:config.Config.RpcRegisterName.OpenImMessageGatewayName", errors.New("config error")) + } else { + port, _ = strconv.Atoi(gatewayEnds[1]) + } + podInfo := strings.Split(selfPodName, "-") + instance = podInfo[0] + count := len(podInfo) + statefuleIndex, _ = strconv.Atoi(podInfo[count-1]) + host := fmt.Sprintf("%s-openim-msggateway-%d.%s-openim-msggateway-headless.%s.svc.cluster.local:%d", instance, statefuleIndex, instance, ns, port) + return host +} + // GetConns returns a list of gRPC client connections for a given service. func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { - conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) - return []*grpc.ClientConn{conn}, err + if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName { + conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) + return []*grpc.ClientConn{conn}, err + } + var ret []*grpc.ClientConn + gatewayHosts := cli.getMsgGatewayHost(ctx) + for _, host := range gatewayHosts { + conn, err := grpc.DialContext(ctx, host, append(cli.options, opts...)...) + if err != nil { + return nil, err + } + ret = append(ret, conn) + } + return ret, nil +} + +// like openimserver-openim-msggateway-0.openimserver-openim-msggateway-headless.openim-lin.svc.cluster.local:88 +func (cli *K8sDR) getMsgGatewayHost(ctx context.Context) []string { + port := 88 + instance := "openimserver" + selfPodName := os.Getenv("MY_POD_NAME") + replicas := os.Getenv("MY_MSGGATEWAY_REPLICACOUNT") + ns := os.Getenv("MY_POD_NAMESPACE") + gatewayEnds := strings.Split(config.Config.RpcRegisterName.OpenImMessageGatewayName, ":") + if len(gatewayEnds) != 2 { + log.ZError(ctx, "msggateway RpcRegisterName is error:config.Config.RpcRegisterName.OpenImMessageGatewayName", errors.New("config error")) + } else { + port, _ = strconv.Atoi(gatewayEnds[1]) + } + nReplicas, _ := strconv.Atoi(replicas) + podInfo := strings.Split(selfPodName, "-") + instance = podInfo[0] + var ret []string + for i := 0; i < nReplicas; i++ { + host := fmt.Sprintf("%s-openim-msggateway-%d.%s-openim-msggateway-headless.%s.svc.cluster.local:%d", instance, i, instance, ns, port) + ret = append(ret, host) + } + log.ZInfo(ctx, "getMsgGatewayHost", "instance", instance, "selfPodName", selfPodName, "replicas", replicas, "ns", ns, "ret", ret) + + return ret } // GetConn returns a single gRPC client connection for a given service.