diff --git a/go.mod b/go.mod index 53392f739..3398349be 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.72-alpha.61 - github.com/openimsdk/tools v0.0.50-alpha.46 + github.com/openimsdk/tools v0.0.50-alpha.51 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 9bfbb38fb..2f1dfcfda 100644 --- a/go.sum +++ b/go.sum @@ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrk github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.72-alpha.61 h1:RuZR9/Sg3p6Bpb2CKPjPoA2AUmTvHITmhZ3PT/RbWMs= github.com/openimsdk/protocol v0.0.72-alpha.61/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= -github.com/openimsdk/tools v0.0.50-alpha.46 h1:j3HxPxhDptVHwr7eChL2rCH8mKfpUEcr4nHi5k4yDME= -github.com/openimsdk/tools v0.0.50-alpha.46/go.mod h1:muCtxguNJv8lFwLei27UASu2Nvg4ERSeN0R4K5tivk0= +github.com/openimsdk/tools v0.0.50-alpha.51 h1:M3dMUoHjggx5Ry6XSkK0FTSJmRQjjkSBpuzXiFzKtC4= +github.com/openimsdk/tools v0.0.50-alpha.51/go.mod h1:muCtxguNJv8lFwLei27UASu2Nvg4ERSeN0R4K5tivk0= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index 335556ae9..2e23262b1 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -3,15 +3,16 @@ package msggateway import ( "context" "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" - "github.com/openimsdk/open-im-server/v3/pkg/rpccache" - pbAuth "github.com/openimsdk/protocol/auth" - "github.com/openimsdk/tools/mcontext" "net/http" "sync" "sync/atomic" "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/open-im-server/v3/pkg/rpccache" + pbAuth "github.com/openimsdk/protocol/auth" + "github.com/openimsdk/tools/mcontext" + "github.com/go-playground/validator/v10" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go index f1ff7fed5..23e68339c 100644 --- a/internal/push/onlinepusher.go +++ b/internal/push/onlinepusher.go @@ -2,14 +2,19 @@ package push import ( "context" + "errors" + "sync" + "github.com/openimsdk/protocol/msggateway" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" "golang.org/x/sync/errgroup" "google.golang.org/grpc" - "sync" + + conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) type OnlinePusher interface { @@ -37,15 +42,16 @@ func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg * } func NewOnlinePusher(disCov discovery.SvcDiscoveryRegistry, config *Config) OnlinePusher { - switch config.Discovery.Enable { - case "k8s": - return NewK8sStaticConsistentHash(disCov, config) - case "zookeeper": + + if config.runTimeEnv == conf.KUBERNETES { return NewDefaultAllNode(disCov, config) - case "etcd": + } + switch config.Discovery.Enable { + case conf.ETCD: return NewDefaultAllNode(disCov, config) default: - return newEmptyOnlinePusher() + log.ZError(context.Background(), "NewOnlinePusher is error", errs.Wrap(errors.New("unsupported discovery type")), "type", config.Discovery.Enable) + return nil } } diff --git a/internal/push/push.go b/internal/push/push.go index 7f14bced7..74a4a0a4a 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -10,6 +10,7 @@ import ( pbpush "github.com/openimsdk/protocol/push" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/utils/runtimeenv" "google.golang.org/grpc" ) @@ -32,6 +33,8 @@ type Config struct { LocalCacheConfig config.LocalCache Discovery config.Discovery FcmConfigPath string + + runTimeEnv string } func (p pushServer) PushMsg(ctx context.Context, req *pbpush.PushMsgReq) (*pbpush.PushMsgResp, error) { @@ -48,6 +51,8 @@ func (p pushServer) DelUserPushToken(ctx context.Context, } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { + config.runTimeEnv = runtimeenv.PrintRuntimeEnvironment() + rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) if err != nil { return err diff --git a/pkg/common/config/constant.go b/pkg/common/config/constant.go index 9aeaedca9..f3fcc67ef 100644 --- a/pkg/common/config/constant.go +++ b/pkg/common/config/constant.go @@ -20,6 +20,7 @@ const ( MountConfigFilePath = "CONFIG_PATH" DeploymentType = "DEPLOYMENT_TYPE" KUBERNETES = "kubernetes" + ETCD = "etcd" ) const ( diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 419293a91..ae4229e1b 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -19,6 +19,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/discovery" + "google.golang.org/grpc" "github.com/openimsdk/tools/discovery/kubernetes" @@ -28,14 +29,16 @@ import ( // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string) (discovery.SvcDiscoveryRegistry, error) { - if runtimeEnv == "kubernetes" { - discovery.Enable = "kubernetes" + if runtimeEnv == config.KUBERNETES { + return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace, + grpc.WithDefaultCallOptions( + grpc.MaxCallSendMsgSize(1024*1024*20), + ), + ) } switch discovery.Enable { - case "kubernetes": - return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace) - case "etcd": + case config.ETCD: return etcd.NewSvcDiscoveryRegistry( discovery.Etcd.RootDirectory, discovery.Etcd.Address, diff --git a/pkg/common/discoveryregister/kubernetes/kubernetes.go b/pkg/common/discoveryregister/kubernetes/kubernetes.go index cf6a904be..459f20e0b 100644 --- a/pkg/common/discoveryregister/kubernetes/kubernetes.go +++ b/pkg/common/discoveryregister/kubernetes/kubernetes.go @@ -1,22 +1,10 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package kubernetes import ( "context" "fmt" + "log" + "os" "sync" "time" @@ -35,6 +23,7 @@ type KubernetesConnManager struct { namespace string dialOptions []grpc.DialOption + rpcTargets map[string]string selfTarget string mu sync.RWMutex @@ -76,11 +65,14 @@ func (k *KubernetesConnManager) initializeConns(serviceName string) error { return fmt.Errorf("failed to get endpoints for service %s: %v", serviceName, err) } + // fmt.Println("Endpoints:", endpoints, "endpoints.Subsets:", endpoints.Subsets) + var conns []*grpc.ClientConn for _, subset := range endpoints.Subsets { for _, address := range subset.Addresses { target := fmt.Sprintf("%s:%d", address.IP, port) - conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials())) + // fmt.Println("IP target:", target) + conn, err := grpc.Dial(target, append(k.dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))...) if err != nil { return fmt.Errorf("failed to dial endpoint %s: %v", target, err) } @@ -89,10 +81,8 @@ func (k *KubernetesConnManager) initializeConns(serviceName string) error { } k.mu.Lock() - defer k.mu.Unlock() k.connMap[serviceName] = conns - - // go k.watchEndpoints(serviceName) + k.mu.Unlock() return nil } @@ -100,23 +90,23 @@ func (k *KubernetesConnManager) initializeConns(serviceName string) error { // GetConns returns gRPC client connections for a given Kubernetes service name. func (k *KubernetesConnManager) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { k.mu.RLock() - conns, exists := k.connMap[serviceName] - defer k.mu.RUnlock() + conns, exists := k.connMap[serviceName] + k.mu.RUnlock() if exists { return conns, nil } k.mu.Lock() - defer k.mu.Unlock() - // Check if another goroutine has already initialized the connections when we released the read lock conns, exists = k.connMap[serviceName] if exists { return conns, nil } + k.mu.Unlock() if err := k.initializeConns(serviceName); err != nil { + fmt.Println("Failed to initialize connections:", err) return nil, fmt.Errorf("failed to initialize connections for service %s: %v", serviceName, err) } @@ -125,26 +115,64 @@ func (k *KubernetesConnManager) GetConns(ctx context.Context, serviceName string // GetConn returns a single gRPC client connection for a given Kubernetes service name. func (k *KubernetesConnManager) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { - port, err := k.getServicePort(serviceName) - if err != nil { - return nil, err - } + var target string - fmt.Println("SVC port:", port) + if k.rpcTargets[serviceName] == "" { + var err error + + svcPort, err := k.getServicePort(serviceName) + if err != nil { + return nil, err + } - target := fmt.Sprintf("%s.%s.svc.cluster.local:%d", serviceName, k.namespace, port) + target = fmt.Sprintf("%s.%s.svc.cluster.local:%d", serviceName, k.namespace, svcPort) - fmt.Println("SVC target:", target) + // fmt.Println("SVC target:", target) + } else { + target = k.rpcTargets[serviceName] + } return grpc.DialContext( ctx, target, - append([]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, k.dialOptions...)..., + append([]grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*10), grpc.MaxCallSendMsgSize(1024*1024*20)), + }, k.dialOptions...)..., ) } // GetSelfConnTarget returns the connection target for the current service. func (k *KubernetesConnManager) GetSelfConnTarget() string { + if k.selfTarget == "" { + hostName := os.Getenv("HOSTNAME") + + pod, err := k.clientset.CoreV1().Pods(k.namespace).Get(context.Background(), hostName, metav1.GetOptions{}) + if err != nil { + log.Printf("failed to get pod %s: %v \n", hostName, err) + } + + for pod.Status.PodIP == "" { + pod, err = k.clientset.CoreV1().Pods(k.namespace).Get(context.TODO(), hostName, metav1.GetOptions{}) + if err != nil { + log.Printf("Error getting pod: %v \n", err) + } + + time.Sleep(3 * time.Second) + } + + var selfPort int32 + + for _, port := range pod.Spec.Containers[0].Ports { + if port.ContainerPort != 10001 { + selfPort = port.ContainerPort + break + } + } + + k.selfTarget = fmt.Sprintf("%s:%d", pod.Status.PodIP, selfPort) + } + return k.selfTarget } @@ -175,6 +203,7 @@ func (k *KubernetesConnManager) Close() { func (k *KubernetesConnManager) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { return nil } + func (k *KubernetesConnManager) UnRegister() error { return nil } @@ -184,6 +213,8 @@ func (k *KubernetesConnManager) GetUserIdHashGatewayHost(ctx context.Context, us } func (k *KubernetesConnManager) getServicePort(serviceName string) (int32, error) { + var svcPort int32 + svc, err := k.clientset.CoreV1().Services(k.namespace).Get(context.Background(), serviceName, metav1.GetOptions{}) if err != nil { fmt.Print("namespace:", k.namespace) @@ -194,7 +225,15 @@ func (k *KubernetesConnManager) getServicePort(serviceName string) (int32, error return 0, fmt.Errorf("service %s has no ports defined", serviceName) } - return svc.Spec.Ports[0].Port, nil + for _, port := range svc.Spec.Ports { + // fmt.Println(serviceName, " Now Get Port:", port.Port) + if port.Port != 10001 { + svcPort = port.Port + break + } + } + + return svcPort, nil } // watchEndpoints listens for changes in Pod resources. @@ -229,68 +268,3 @@ func (k *KubernetesConnManager) handleEndpointChange(obj interface{}) { fmt.Printf("Error initializing connections for %s: %v\n", serviceName, err) } } - -// ================= - -// initEndpoints initializes connections by fetching all available endpoints in the specified namespace. - -// func (k *KubernetesConnManager) initEndpoints() error { -// k.mu.Lock() -// defer k.mu.Unlock() - -// pods, err := k.clientset.CoreV1().Pods(k.namespace).List(context.TODO(), metav1.ListOptions{}) -// if err != nil { -// return fmt.Errorf("failed to list pods: %v", err) -// } - -// for _, pod := range pods.Items { -// if pod.Status.Phase == v1.PodRunning { -// target := fmt.Sprintf("%s:%d", address.IP, port) -// conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials())) -// conn, err := k.createGRPCConnection(pod) -// if err != nil { -// return fmt.Errorf("failed to create GRPC connection for pod %s: %v", pod.Name, err) -// } -// k.connMap[pod.Name] = append(k.connMap[pod.Name], conn) -// } -// } - -// return nil -// } - -// ----- - -// func (k *KubernetesConnManager) watchEndpoints1(serviceName string) { -// // watch for changes to the service's endpoints -// informerFactory := informers.NewSharedInformerFactory(k.clientset, time.Minute) -// endpointsInformer := informerFactory.Core().V1().Endpoints().Informer() - -// endpointsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ -// AddFunc: func(obj interface{}) { -// eps := obj.(*v1.Endpoints) -// if eps.Name == serviceName { -// k.initializeConns(serviceName) -// } -// }, -// UpdateFunc: func(oldObj, newObj interface{}) { -// eps := newObj.(*v1.Endpoints) -// if eps.Name == serviceName { -// k.initializeConns(serviceName) -// } -// }, -// DeleteFunc: func(obj interface{}) { -// eps := obj.(*v1.Endpoints) -// if eps.Name == serviceName { -// k.mu.Lock() -// defer k.mu.Unlock() -// for _, conn := range k.connMap[serviceName] { -// _ = conn.Close() -// } -// delete(k.connMap, serviceName) -// } -// }, -// }) - -// informerFactory.Start(wait.NeverStop) -// informerFactory.WaitForCacheSync(wait.NeverStop) -// }