@ -16,184 +16,281 @@ package kubernetes
import (
"context"
"errors"
"fmt"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/log"
"github.com/stathat/consistent"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
// K8sDR represents the Kubernetes service discovery and registration client.
type K8sDR struct {
options [ ] grpc . DialOption
rpcRegisterAddr string
gatewayHostConsistent * consistent . Consistent
gatewayName string
type KubernetesConnManager struct {
clientset * kubernetes . Clientset
namespace string
dialOptions [ ] grpc . DialOption
selfTarget string
mu sync . RWMutex
connMap map [ string ] [ ] * grpc . ClientConn
}
func NewK8sDiscoveryRegister ( gatewayName string ) ( discovery . SvcDiscoveryRegistry , error ) {
gatewayConsistent := consistent . New ( )
gatewayHosts := getMsgGatewayHost ( context . Background ( ) , gatewayName )
for _ , v := range gatewayHosts {
gatewayConsistent . Add ( v )
// NewKubernetesConnManager creates a new connection manager that uses Kubernetes services for service discovery.
func NewKubernetesConnManager ( namespace string , options ... grpc . DialOption ) ( * KubernetesConnManager , error ) {
config, err := rest . InClusterConfig ( )
if err != nil {
return nil , fmt . Errorf ( "failed to create in-cluster config: %v" , err )
}
return & K8sDR { gatewayHostConsistent : gatewayConsistent } , nil
}
func ( cli * K8sDR ) Register ( serviceName , host string , port int , opts ... grpc . DialOption ) error {
if serviceName != cli . gatewayName {
cli . rpcRegisterAddr = serviceName
} else {
cli . rpcRegisterAddr = getSelfHost ( context . Background ( ) , cli . gatewayName )
clientset , err := kubernetes . NewForConfig ( config )
if err != nil {
return nil , fmt . Errorf ( "failed to create clientset: %v" , err )
}
return nil
}
k := & KubernetesConnManager {
clientset : clientset ,
namespace : namespace ,
dialOptions : options ,
connMap : make ( map [ string ] [ ] * grpc . ClientConn ) ,
}
func ( cli * K8sDR ) UnRegister ( ) error {
go k . watchEndpoints ( )
return nil
return k , nil
}
func ( cli * K8sDR ) CreateRpcRootNodes ( serviceNames [ ] string ) error {
func ( k * KubernetesConnManager ) initializeConns ( serviceName string ) error {
port , err := k . getServicePort ( serviceName )
if err != nil {
return err
}
return nil
}
endpoints , err := k . clientset . CoreV1 ( ) . Endpoints ( k . namespace ) . Get ( context . Background ( ) , serviceName , metav1 . GetOptions { } )
if err != nil {
return fmt . Errorf ( "failed to get endpoints for service %s: %v" , serviceName , err )
}
var conns [ ] * grpc . ClientConn
for _ , subset := range endpoints . Subsets {
for _ , address := range subset . Addresses {
target := fmt . Sprintf ( "%s:%d" , address . IP , port )
conn , err := grpc . Dial ( target , grpc . WithTransportCredentials ( insecure . NewCredentials ( ) ) )
if err != nil {
return fmt . Errorf ( "failed to dial endpoint %s: %v" , target , err )
}
conns = append ( conns , conn )
}
}
k . mu . Lock ( )
defer k . mu . Unlock ( )
k . connMap [ serviceName ] = conns
func ( cli * K8sDR ) RegisterConf2Registry ( key string , conf [ ] byte ) error {
// go k.watchEndpoints(serviceName)
return nil
}
func ( cli * K8sDR ) GetConfFromRegistry ( key string ) ( [ ] byte , error ) {
// GetConns returns gRPC client connections for a given Kubernetes service name.
func ( k * KubernetesConnManager ) GetConns ( ctx context . Context , serviceName string , opts ... grpc . DialOption ) ( [ ] * grpc . ClientConn , error ) {
k . mu . RLock ( )
conns , exists := k . connMap [ serviceName ]
defer k . mu . RUnlock ( )
return nil , nil
}
if exists {
return conns , nil
}
func ( cli * K8sDR ) GetUserIdHashGatewayHost ( ctx context . Context , userId string ) ( string , error ) {
host , err := cli . gatewayHostConsistent . Get ( userId )
if err != nil {
log . ZError ( ctx , "GetUserIdHashGatewayHost error" , err )
k . mu . Lock ( )
defer k . mu . Unlock ( )
// Check if another goroutine has already initialized the connections when we released the read lock
conns , exists = k . connMap [ serviceName ]
if exists {
return conns , nil
}
return host , err
}
func getSelfHost ( ctx context . Context , gatewayName string ) string {
port := 88
instance := "openimserver"
selfPodName := os . Getenv ( "MY_POD_NAME" )
ns := os . Getenv ( "MY_POD_NAMESPACE" )
statefuleIndex := 0
gatewayEnds := strings . Split ( gatewayName , ":" )
if len ( gatewayEnds ) != 2 {
log . ZError ( ctx , "msggateway RpcRegisterName is error:config.RpcRegisterName.OpenImMessageGatewayName" , errors . New ( "config error" ) )
} else {
port , _ = strconv . Atoi ( gatewayEnds [ 1 ] )
}
podInfo := strings . Split ( selfPodName , "-" )
instance = podInfo [ 0 ]
count := len ( podInfo )
statefuleIndex , _ = strconv . Atoi ( podInfo [ count - 1 ] )
host := fmt . Sprintf ( "%s-openim-msggateway-%d.%s-openim-msggateway-headless.%s.svc.cluster.local:%d" , instance , statefuleIndex , instance , ns , port )
return host
}
if err := k . initializeConns ( serviceName ) ; err != nil {
return nil , fmt . Errorf ( "failed to initialize connections for service %s: %v" , serviceName , err )
}
// like openimserver-openim-msggateway-0.openimserver-openim-msggateway-headless.openim-lin.svc.cluster.local:88.
// Replica set in kubernetes environment
func getMsgGatewayHost ( ctx context . Context , gatewayName string ) [ ] string {
port := 88
instance := "openimserver"
selfPodName := os . Getenv ( "MY_POD_NAME" )
replicas := os . Getenv ( "MY_MSGGATEWAY_REPLICACOUNT" )
ns := os . Getenv ( "MY_POD_NAMESPACE" )
gatewayEnds := strings . Split ( gatewayName , ":" )
if len ( gatewayEnds ) != 2 {
log . ZError ( ctx , "msggateway RpcRegisterName is error:config.RpcRegisterName.OpenImMessageGatewayName" , errors . New ( "config error" ) )
} else {
port , _ = strconv . Atoi ( gatewayEnds [ 1 ] )
}
nReplicas , _ := strconv . Atoi ( replicas )
podInfo := strings . Split ( selfPodName , "-" )
instance = podInfo [ 0 ]
var ret [ ] string
for i := 0 ; i < nReplicas ; i ++ {
host := fmt . Sprintf ( "%s-openim-msggateway-%d.%s-openim-msggateway-headless.%s.svc.cluster.local:%d" , instance , i , instance , ns , port )
ret = append ( ret , host )
}
log . ZDebug ( ctx , "getMsgGatewayHost" , "instance" , instance , "selfPodName" , selfPodName , "replicas" , replicas , "ns" , ns , "ret" , ret )
return ret
return k . connMap [ serviceName ] , nil
}
// GetConns returns the gRPC client connections to the specified service.
func ( cli * K8sDR ) GetConns ( ctx context . Context , serviceName string , opts ... grpc . DialOption ) ( [ ] * grpc . ClientConn , error ) {
// This conditional checks if the serviceName is not the OpenImMessageGatewayName.
// It seems to handle a special case for the OpenImMessageGateway.
if serviceName != cli . gatewayName {
// DialContext creates a client connection to the given target (serviceName) using the specified context.
// 'cli.options' are likely default or common options for all connections in this struct.
// 'opts...' allows for additional gRPC dial options to be passed and used.
conn , err := grpc . DialContext ( ctx , serviceName , append ( cli . options , opts ... ) ... )
// The function returns a slice of client connections with the new connection, or an error if occurred.
return [ ] * grpc . ClientConn { conn } , err
} else {
// This block is executed if the serviceName is OpenImMessageGatewayName.
// 'ret' will accumulate the connections to return.
var ret [ ] * grpc . ClientConn
// getMsgGatewayHost presumably retrieves hosts for the message gateway service.
// The context is passed, likely for cancellation and timeout control.
gatewayHosts := getMsgGatewayHost ( ctx , cli . gatewayName )
// Iterating over the retrieved gateway hosts.
for _ , host := range gatewayHosts {
// Establishes a connection to each host.
// Again, appending cli.options with any additional opts provided.
conn , err := grpc . DialContext ( ctx , host , append ( cli . options , opts ... ) ... )
// If there's an error while dialing any host, the function returns immediately with the error.
// GetConn returns a single gRPC client connection for a given Kubernetes service name.
func ( k * KubernetesConnManager ) GetConn ( ctx context . Context , serviceName string , opts ... grpc . DialOption ) ( * grpc . ClientConn , error ) {
port , err := k . getServicePort ( serviceName )
if err != nil {
return nil , err
} else {
// If the connection is successful, it is added to the 'ret' slice.
ret = append ( ret , conn )
}
}
// After all hosts are processed, the slice of connections is returned.
return ret , nil
}
}
func ( cli * K8sDR ) GetConn ( ctx context . Context , serviceName string , opts ... grpc . DialOption ) ( * grpc . ClientConn , error ) {
fmt . Println ( "SVC port:" , port )
return grpc . DialContext ( ctx , serviceName , append ( cli . options , opts ... ) ... )
}
target := fmt . Sprintf ( "%s.%s.svc.cluster.local:%d" , serviceName , k . namespace , port )
fmt . Println ( "SVC target:" , target )
func ( cli * K8sDR ) GetSelfConnTarget ( ) string {
return grpc . DialContext (
ctx ,
target ,
append ( [ ] grpc . DialOption { grpc . WithTransportCredentials ( insecure . NewCredentials ( ) ) } , k . dialOptions ... ) ... ,
)
}
return cli . rpcRegisterAddr
// GetSelfConnTarget returns the connection target for the current service.
func ( k * KubernetesConnManager ) GetSelfConnTarget ( ) string {
return k . selfTarget
}
func ( cli * K8sDR ) AddOption ( opts ... grpc . DialOption ) {
cli . options = append ( cli . options , opts ... )
// AddOption appends gRPC dial options to the existing options.
func ( k * KubernetesConnManager ) AddOption ( opts ... grpc . DialOption ) {
k . mu . Lock ( )
defer k . mu . Unlock ( )
k . dialOptions = append ( k . dialOptions , opts ... )
}
func ( cli * K8sDR ) CloseConn ( conn * grpc . ClientConn ) {
// CloseConn closes a given gRPC client connection.
func ( k * KubernetesConnManager ) CloseConn ( conn * grpc . ClientConn ) {
conn . Close ( )
}
// do not use this method for call rpc.
func ( cli * K8sDR ) GetClientLocalConns ( ) map [ string ] [ ] * grpc . ClientConn {
log . ZError ( context . Background ( ) , "should not call this function!" , nil )
// Close closes all gRPC connections managed by KubernetesConnManager.
func ( k * KubernetesConnManager ) Close ( ) {
k . mu . Lock ( )
defer k . mu . Unlock ( )
for _ , conns := range k . connMap {
for _ , conn := range conns {
_ = conn . Close ( )
}
}
k . connMap = make ( map [ string ] [ ] * grpc . ClientConn )
}
func ( k * KubernetesConnManager ) Register ( serviceName , host string , port int , opts ... grpc . DialOption ) error {
return nil
}
func ( k * KubernetesConnManager ) UnRegister ( ) error {
return nil
}
func ( cli * K8sDR ) Close ( ) {
func ( k * KubernetesConnManager ) GetUserIdHashGatewayHost ( ctx context . Context , userId string ) ( string , error ) {
return "" , nil
}
func ( k * KubernetesConnManager ) getServicePort ( serviceName string ) ( int32 , error ) {
svc , err := k . clientset . CoreV1 ( ) . Services ( k . namespace ) . Get ( context . Background ( ) , serviceName , metav1 . GetOptions { } )
if err != nil {
fmt . Print ( "namespace:" , k . namespace )
return 0 , fmt . Errorf ( "failed to get service %s: %v" , serviceName , err )
}
if len ( svc . Spec . Ports ) == 0 {
return 0 , fmt . Errorf ( "service %s has no ports defined" , serviceName )
}
return svc . Spec . Ports [ 0 ] . Port , nil
}
// watchEndpoints listens for changes in Pod resources.
func ( k * KubernetesConnManager ) watchEndpoints ( ) {
informerFactory := informers . NewSharedInformerFactory ( k . clientset , time . Minute * 10 )
informer := informerFactory . Core ( ) . V1 ( ) . Pods ( ) . Informer ( )
// Watch for Pod changes (add, update, delete)
informer . AddEventHandler ( cache . ResourceEventHandlerFuncs {
AddFunc : func ( obj interface { } ) {
k . handleEndpointChange ( obj )
} ,
UpdateFunc : func ( oldObj , newObj interface { } ) {
k . handleEndpointChange ( newObj )
} ,
DeleteFunc : func ( obj interface { } ) {
k . handleEndpointChange ( obj )
} ,
} )
informerFactory . Start ( context . Background ( ) . Done ( ) )
<- context . Background ( ) . Done ( ) // Block forever
}
func ( k * KubernetesConnManager ) handleEndpointChange ( obj interface { } ) {
endpoint , ok := obj . ( * v1 . Endpoints )
if ! ok {
return
}
serviceName := endpoint . Name
if err := k . initializeConns ( serviceName ) ; err != nil {
fmt . Printf ( "Error initializing connections for %s: %v\n" , serviceName , err )
}
}
// =================
// initEndpoints initializes connections by fetching all available endpoints in the specified namespace.
// func (k *KubernetesConnManager) initEndpoints() error {
// k.mu.Lock()
// defer k.mu.Unlock()
// pods, err := k.clientset.CoreV1().Pods(k.namespace).List(context.TODO(), metav1.ListOptions{})
// if err != nil {
// return fmt.Errorf("failed to list pods: %v", err)
// }
// for _, pod := range pods.Items {
// if pod.Status.Phase == v1.PodRunning {
// target := fmt.Sprintf("%s:%d", address.IP, port)
// conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
// conn, err := k.createGRPCConnection(pod)
// if err != nil {
// return fmt.Errorf("failed to create GRPC connection for pod %s: %v", pod.Name, err)
// }
// k.connMap[pod.Name] = append(k.connMap[pod.Name], conn)
// }
// }
// return nil
// }
// -----
// func (k *KubernetesConnManager) watchEndpoints1(serviceName string) {
// // watch for changes to the service's endpoints
// informerFactory := informers.NewSharedInformerFactory(k.clientset, time.Minute)
// endpointsInformer := informerFactory.Core().V1().Endpoints().Informer()
// endpointsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
// AddFunc: func(obj interface{}) {
// eps := obj.(*v1.Endpoints)
// if eps.Name == serviceName {
// k.initializeConns(serviceName)
// }
// },
// UpdateFunc: func(oldObj, newObj interface{}) {
// eps := newObj.(*v1.Endpoints)
// if eps.Name == serviceName {
// k.initializeConns(serviceName)
// }
// },
// DeleteFunc: func(obj interface{}) {
// eps := obj.(*v1.Endpoints)
// if eps.Name == serviceName {
// k.mu.Lock()
// defer k.mu.Unlock()
// for _, conn := range k.connMap[serviceName] {
// _ = conn.Close()
// }
// delete(k.connMap, serviceName)
// }
// },
// })
// informerFactory.Start(wait.NeverStop)
// informerFactory.WaitForCacheSync(wait.NeverStop)
// }