From 13ca8fd1482256a739d3600b323c30c11b9e4f99 Mon Sep 17 00:00:00 2001 From: AndrewZuo01 Date: Mon, 15 Jan 2024 16:34:00 +0800 Subject: [PATCH] Your commit message here --- .../discoveryregister/direct/directconn.go | 136 ++++++++++++++++++ .../discoveryregister/discoveryregister.go | 3 + .../kubernetes/kubernetes.go | 20 +++ 3 files changed, 159 insertions(+) create mode 100644 pkg/common/discoveryregister/direct/directconn.go diff --git a/pkg/common/discoveryregister/direct/directconn.go b/pkg/common/discoveryregister/direct/directconn.go new file mode 100644 index 000000000..d826cfb67 --- /dev/null +++ b/pkg/common/discoveryregister/direct/directconn.go @@ -0,0 +1,136 @@ +package direct + +import ( + "context" + "errors" + "fmt" + "github.com/OpenIMSDK/tools/errs" + config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "google.golang.org/grpc" + "net" + "time" +) + +type ServiceAddresses map[string]string + +func getServiceAddresses() ServiceAddresses { + return ServiceAddresses{ + "OpenImUser": fmt.Sprintf(config2.Config.RpcRegisterName.OpenImUserName, config2.Config.RpcPort.OpenImUserPort[0]), + "OpenImFriend": fmt.Sprintf(config2.Config.RpcRegisterName.OpenImFriendName, config2.Config.RpcPort.OpenImFriendPort[0]), + "OpenImMessage": fmt.Sprintf(config2.Config.RpcRegisterName.OpenImMsgName, config2.Config.RpcPort.OpenImMessagePort[0]), + "OpenImMessageGateway": fmt.Sprintf(config2.Config.RpcRegisterName.OpenImMessageGatewayName, config2.Config.RpcPort.OpenImMessageGatewayPort[0]), + "OpenImGroup": fmt.Sprintf(config2.Config.RpcRegisterName.OpenImGroupName, config2.Config.RpcPort.OpenImGroupPort[0]), + "OpenImAuth": fmt.Sprintf(config2.Config.RpcRegisterName.OpenImAuthName, config2.Config.RpcPort.OpenImAuthPort[0]), + "OpenImPush": fmt.Sprintf(config2.Config.RpcRegisterName.OpenImPushName, config2.Config.RpcPort.OpenImPushPort[0]), + "OpenImConversation": fmt.Sprintf(config2.Config.RpcRegisterName.OpenImConversationName, config2.Config.RpcPort.OpenImConversationPort[0]), + "OpenImThird": fmt.Sprintf(config2.Config.RpcRegisterName.OpenImThirdName, config2.Config.RpcPort.OpenImThirdPort[0]), + } +} + +type ConnManager struct { + additionalOpts []grpc.DialOption + currentServiceAddress string + conns map[string]*grpc.ClientConn +} + +func (cm *ConnManager) GetClientLocalConns() map[string][]*grpc.ClientConn { + return nil +} + +func (cm *ConnManager) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) { + return "", nil +} + +func (cm *ConnManager) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { + return nil +} + +func (cm *ConnManager) UnRegister() error { + return nil +} + +func (cm *ConnManager) CreateRpcRootNodes(serviceNames []string) error { + return nil +} + +func (cm *ConnManager) RegisterConf2Registry(key string, conf []byte) error { + return nil +} + +func (cm *ConnManager) GetConfFromRegistry(key string) ([]byte, error) { + return nil, nil +} + +func (cm *ConnManager) Close() { + +} + +func NewConnManager() (*ConnManager, error) { + return &ConnManager{ + conns: make(map[string]*grpc.ClientConn), + }, nil +} + +func (cm *ConnManager) GetConns(ctx context.Context, + serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { + var connections []*grpc.ClientConn + for name, conn := range cm.conns { + if name == serviceName { + connections = append(connections, conn) + } + } + if len(connections) == 0 { + return nil, fmt.Errorf("no connections found for service: %s", serviceName) + } + return connections, nil +} + +func (cm *ConnManager) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + if conn, exists := cm.conns[serviceName]; exists { + return conn, nil + } + addresses := getServiceAddresses() + address, ok := addresses[serviceName] + if !ok { + return nil, errs.Wrap(errors.New("unknown service name"), "serviceName", serviceName) + } + + conn, err := dialService(address, opts...) + if err != nil { + return nil, err + } + cm.conns[serviceName] = conn + return conn, nil +} + +func (cm *ConnManager) GetSelfConnTarget() string { + return cm.currentServiceAddress +} + +func (cm *ConnManager) AddOption(opts ...grpc.DialOption) { + cm.additionalOpts = append(cm.additionalOpts, opts...) +} + +func (cm *ConnManager) CloseConn(conn *grpc.ClientConn) { + if conn != nil { + conn.Close() + } +} + +func dialService(address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + options := append(opts, grpc.WithInsecure()) // Replace WithInsecure with proper security options + conn, err := grpc.DialContext(context.Background(), address, options...) + if err != nil { + return nil, err + } + return conn, nil +} + +func checkServiceHealth(address string) bool { + conn, err := net.DialTimeout("tcp", address, time.Second*3) + if err != nil { + return false + } + conn.Close() + return true +} diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index c14323027..eb4224ed1 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -16,6 +16,7 @@ package discoveryregister import ( "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/direct" "os" "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes" @@ -36,6 +37,8 @@ func NewDiscoveryRegister(envType string) (discoveryregistry.SvcDiscoveryRegistr return zookeeper.NewZookeeperDiscoveryRegister() case "k8s": return kubernetes.NewK8sDiscoveryRegister() + case "direct": + return direct.NewConnManager() default: return nil, errors.New("envType not correct") } diff --git a/pkg/common/discoveryregister/kubernetes/kubernetes.go b/pkg/common/discoveryregister/kubernetes/kubernetes.go index c10518056..e40db9f78 100644 --- a/pkg/common/discoveryregister/kubernetes/kubernetes.go +++ b/pkg/common/discoveryregister/kubernetes/kubernetes.go @@ -130,20 +130,40 @@ func getMsgGatewayHost(ctx context.Context) []string { // GetConns returns the gRPC client connections to the specified service. func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { + // This conditional checks if the serviceName is not the OpenImMessageGatewayName. + // It seems to handle a special case for the OpenImMessageGateway. if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName { + // DialContext creates a client connection to the given target (serviceName) using the specified context. + // 'cli.options' are likely default or common options for all connections in this struct. + // 'opts...' allows for additional gRPC dial options to be passed and used. conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) + + // The function returns a slice of client connections with the new connection, or an error if occurred. return []*grpc.ClientConn{conn}, err } else { + // This block is executed if the serviceName is OpenImMessageGatewayName. + // 'ret' will accumulate the connections to return. var ret []*grpc.ClientConn + + // getMsgGatewayHost presumably retrieves hosts for the message gateway service. + // The context is passed, likely for cancellation and timeout control. gatewayHosts := getMsgGatewayHost(ctx) + + // Iterating over the retrieved gateway hosts. for _, host := range gatewayHosts { + // Establishes a connection to each host. + // Again, appending cli.options with any additional opts provided. conn, err := grpc.DialContext(ctx, host, append(cli.options, opts...)...) + + // If there's an error while dialing any host, the function returns immediately with the error. if err != nil { return nil, err } else { + // If the connection is successful, it is added to the 'ret' slice. ret = append(ret, conn) } } + // After all hosts are processed, the slice of connections is returned. return ret, nil } }