From c5c5b2fd8ea3a6cc5c73b7b5860d32285c0c29e5 Mon Sep 17 00:00:00 2001 From: xuexihuang <1339326187@qq.com> Date: Sun, 17 Dec 2023 17:40:04 +0800 Subject: [PATCH] support multipe msggateway services in k8s deployments (#1565) * feature:support multi msggateway * feature:support multi msggateway --------- Co-authored-by: lin.huang --- .../discoveryregister/discoveryregister.go | 72 ++++++++++++++++++- 1 file changed, 69 insertions(+), 3 deletions(-) diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index c204184ff..62c1f4a31 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -4,6 +4,9 @@ import ( "context" "errors" "fmt" + "os" + "strconv" + "strings" "time" "github.com/OpenIMSDK/tools/discoveryregistry" @@ -43,7 +46,12 @@ func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) { } func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { - cli.rpcRegisterAddr = serviceName + if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName { + cli.rpcRegisterAddr = serviceName + } else { + cli.rpcRegisterAddr = cli.getSelfHost(context.Background()) + } + return nil } func (cli *K8sDR) UnRegister() error { @@ -63,10 +71,68 @@ func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) { return nil, nil } +func (cli *K8sDR) getSelfHost(ctx context.Context) string { + port := 88 + instance := "openimserver" + selfPodName := os.Getenv("MY_POD_NAME") + ns := os.Getenv("MY_POD_NAMESPACE") + statefuleIndex := 0 + gatewayEnds := strings.Split(config.Config.RpcRegisterName.OpenImMessageGatewayName, ":") + if len(gatewayEnds) != 2 { + log.ZError(ctx, "msggateway RpcRegisterName is error:config.Config.RpcRegisterName.OpenImMessageGatewayName", errors.New("config error")) + } else { + port, _ = strconv.Atoi(gatewayEnds[1]) + } + podInfo := strings.Split(selfPodName, "-") + instance = podInfo[0] + count := len(podInfo) + statefuleIndex, _ = strconv.Atoi(podInfo[count-1]) + host := fmt.Sprintf("%s-openim-msggateway-%d.%s-openim-msggateway-headless.%s.svc.cluster.local:%d", instance, statefuleIndex, instance, ns, port) + return host +} + +// like openimserver-openim-msggateway-0.openimserver-openim-msggateway-headless.openim-lin.svc.cluster.local:88 +func (cli *K8sDR) getMsgGatewayHost(ctx context.Context) []string { + port := 88 + instance := "openimserver" + selfPodName := os.Getenv("MY_POD_NAME") + replicas := os.Getenv("MY_MSGGATEWAY_REPLICACOUNT") + ns := os.Getenv("MY_POD_NAMESPACE") + gatewayEnds := strings.Split(config.Config.RpcRegisterName.OpenImMessageGatewayName, ":") + if len(gatewayEnds) != 2 { + log.ZError(ctx, "msggateway RpcRegisterName is error:config.Config.RpcRegisterName.OpenImMessageGatewayName", errors.New("config error")) + } else { + port, _ = strconv.Atoi(gatewayEnds[1]) + } + nReplicas, _ := strconv.Atoi(replicas) + podInfo := strings.Split(selfPodName, "-") + instance = podInfo[0] + var ret []string + for i := 0; i < nReplicas; i++ { + host := fmt.Sprintf("%s-openim-msggateway-%d.%s-openim-msggateway-headless.%s.svc.cluster.local:%d", instance, i, instance, ns, port) + ret = append(ret, host) + } + log.ZInfo(ctx, "getMsgGatewayHost", "instance", instance, "selfPodName", selfPodName, "replicas", replicas, "ns", ns, "ret", ret) + return ret +} func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { - conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) - return []*grpc.ClientConn{conn}, err + if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName { + conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) + return []*grpc.ClientConn{conn}, err + } else { + var ret []*grpc.ClientConn + gatewayHosts := cli.getMsgGatewayHost(ctx) + for _, host := range gatewayHosts { + conn, err := grpc.DialContext(ctx, host, append(cli.options, opts...)...) + if err != nil { + return nil, err + } else { + ret = append(ret, conn) + } + } + return ret, nil + } } func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {