diff --git a/pkg/common/discoveryregister/kubernetes/kubernetes.go b/pkg/common/discoveryregister/kubernetes/kubernetes.go index cd5fb0a36..2ff1539e6 100644 --- a/pkg/common/discoveryregister/kubernetes/kubernetes.go +++ b/pkg/common/discoveryregister/kubernetes/kubernetes.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "github.com/stathat/consistent" "os" "strconv" "strings" @@ -31,51 +32,54 @@ import ( // K8sDR represents the Kubernetes service discovery and registration client. type K8sDR struct { - options []grpc.DialOption - rpcRegisterAddr string + options []grpc.DialOption + rpcRegisterAddr string + gatewayHostConsistent *consistent.Consistent } -// NewK8sDiscoveryRegister creates a new instance of K8sDR for Kubernetes service discovery and registration. func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) { - - return &K8sDR{}, nil + gatewayConsistent := consistent.New() + gatewayHosts := getMsgGatewayHost(context.Background()) + for _, v := range gatewayHosts { + gatewayConsistent.Add(v) + } + return &K8sDR{gatewayHostConsistent: gatewayConsistent}, nil } -// Register registers a service with Kubernetes. func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName { cli.rpcRegisterAddr = serviceName } else { - cli.rpcRegisterAddr = cli.getSelfHost(context.Background()) + cli.rpcRegisterAddr = getSelfHost(context.Background()) } return nil } - -// UnRegister removes a service registration from Kubernetes. func (cli *K8sDR) UnRegister() error { return nil } - -// CreateRpcRootNodes creates root nodes for RPC in Kubernetes. func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error { return nil } - -// RegisterConf2Registry registers a configuration to the registry. func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error { return nil } -// GetConfFromRegistry retrieves a configuration from the registry. func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) { + return nil, nil } - -func (cli *K8sDR) getSelfHost(ctx context.Context) string { +func (cli *K8sDR) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) { + host, err := cli.gatewayHostConsistent.Get(userId) + if err != nil { + log.ZError(ctx, "GetUserIdHashGatewayHost error", err) + } + return host, err +} +func getSelfHost(ctx context.Context) string { port := 88 instance := "openimserver" selfPodName := os.Getenv("MY_POD_NAME") @@ -95,26 +99,8 @@ func (cli *K8sDR) getSelfHost(ctx context.Context) string { return host } -// GetConns returns a list of gRPC client connections for a given service. -func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { - if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName { - conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) - return []*grpc.ClientConn{conn}, err - } - var ret []*grpc.ClientConn - gatewayHosts := cli.getMsgGatewayHost(ctx) - for _, host := range gatewayHosts { - conn, err := grpc.DialContext(ctx, host, append(cli.options, opts...)...) - if err != nil { - return nil, err - } - ret = append(ret, conn) - } - return ret, nil -} - // like openimserver-openim-msggateway-0.openimserver-openim-msggateway-headless.openim-lin.svc.cluster.local:88 -func (cli *K8sDR) getMsgGatewayHost(ctx context.Context) []string { +func getMsgGatewayHost(ctx context.Context) []string { port := 88 instance := "openimserver" selfPodName := os.Getenv("MY_POD_NAME") @@ -135,40 +121,48 @@ func (cli *K8sDR) getMsgGatewayHost(ctx context.Context) []string { ret = append(ret, host) } log.ZInfo(ctx, "getMsgGatewayHost", "instance", instance, "selfPodName", selfPodName, "replicas", replicas, "ns", ns, "ret", ret) - return ret } +func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { -// GetConn returns a single gRPC client connection for a given service. + if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName { + conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) + return []*grpc.ClientConn{conn}, err + } else { + var ret []*grpc.ClientConn + gatewayHosts := getMsgGatewayHost(ctx) + for _, host := range gatewayHosts { + conn, err := grpc.DialContext(ctx, host, append(cli.options, opts...)...) + if err != nil { + return nil, err + } else { + ret = append(ret, conn) + } + } + return ret, nil + } +} func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) } -// GetSelfConnTarget returns the connection target of the client itself. func (cli *K8sDR) GetSelfConnTarget() string { + return cli.rpcRegisterAddr } - -// AddOption adds gRPC dial options to the client. func (cli *K8sDR) AddOption(opts ...grpc.DialOption) { cli.options = append(cli.options, opts...) } - -// CloseConn closes a given gRPC client connection. func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) { conn.Close() } -// do not use this method for call rpc. +// do not use this method for call rpc func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn { fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!") - return nil } - -// Close closes the K8sDR client. func (cli *K8sDR) Close() { - - // Close any open resources here (if applicable) return }