From 466da7664742f8884ebee29756238c7720cc0000 Mon Sep 17 00:00:00 2001 From: "lin.huang" Date: Sun, 24 Sep 2023 06:17:29 +0800 Subject: [PATCH] Code adaptation k8s: service discovery and registration adaptation, configuration adaptation --- cmd/openim-api/main.go | 19 ++-- cmd/openim-push/main.go | 2 +- cmd/openim-rpc/openim-rpc-auth/main.go | 2 +- .../openim-rpc-conversation/main.go | 2 +- cmd/openim-rpc/openim-rpc-friend/main.go | 2 +- cmd/openim-rpc/openim-rpc-group/main.go | 2 +- cmd/openim-rpc/openim-rpc-msg/main.go | 2 +- cmd/openim-rpc/openim-rpc-third/main.go | 2 +- cmd/openim-rpc/openim-rpc-user/main.go | 2 +- config/config.yaml | 23 +++-- internal/msgtransfer/init.go | 15 ++-- internal/tools/msg.go | 14 +-- pkg/common/cmd/api.go | 19 +++- pkg/common/cmd/constant.go | 12 +++ pkg/common/cmd/cron_task.go | 4 +- pkg/common/cmd/msg_gateway.go | 22 ++++- pkg/common/cmd/msg_transfer.go | 6 +- pkg/common/cmd/root.go | 23 ++++- pkg/common/cmd/rpc.go | 45 +++++++++- pkg/common/config/config.go | 12 ++- .../k8s_discovery_register.go | 90 +++++++++++++++++++ pkg/common/startrpc/start.go | 21 +++-- 22 files changed, 274 insertions(+), 67 deletions(-) create mode 100644 pkg/common/cmd/constant.go create mode 100644 pkg/common/discovery_register/k8s_discovery_register.go diff --git a/cmd/openim-api/main.go b/cmd/openim-api/main.go index cf452450c..93f7252b7 100644 --- a/cmd/openim-api/main.go +++ b/cmd/openim-api/main.go @@ -17,15 +17,13 @@ package main import ( "context" "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" "net" - "strconv" - "time" - _ "net/http/pprof" + "strconv" "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/tools/discoveryregistry" - openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper" "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/open-im-server/v3/internal/api" @@ -44,6 +42,7 @@ func main() { } func run(port int) error { + fmt.Println("*****openimapi port:", port) if port == 0 { return fmt.Errorf("port is empty") } @@ -53,11 +52,13 @@ func run(port int) error { } fmt.Println("api start init discov client") var client discoveryregistry.SvcDiscoveryRegistry - client, err = openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, - openkeeper.WithFreq(time.Hour), openkeeper.WithUserNameAndPassword( - config.Config.Zookeeper.Username, - config.Config.Zookeeper.Password, - ), openkeeper.WithRoundRobin(), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger())) + client, err = discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery) + /* + client, err = openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, + openkeeper.WithFreq(time.Hour), openkeeper.WithUserNameAndPassword( + config.Config.Zookeeper.Username, + config.Config.Zookeeper.Password, + ), openkeeper.WithRoundRobin(), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger()))*/ if err != nil { return err } diff --git a/cmd/openim-push/main.go b/cmd/openim-push/main.go index 6ab2fedab..c19cfda60 100644 --- a/cmd/openim-push/main.go +++ b/cmd/openim-push/main.go @@ -21,7 +21,7 @@ import ( ) func main() { - pushCmd := cmd.NewRpcCmd("push") + pushCmd := cmd.NewRpcCmd(cmd.RpcPushServer) pushCmd.AddPortFlag() pushCmd.AddPrometheusPortFlag() if err := pushCmd.Exec(); err != nil { diff --git a/cmd/openim-rpc/openim-rpc-auth/main.go b/cmd/openim-rpc/openim-rpc-auth/main.go index d27a3f11b..645d8cab8 100644 --- a/cmd/openim-rpc/openim-rpc-auth/main.go +++ b/cmd/openim-rpc/openim-rpc-auth/main.go @@ -21,7 +21,7 @@ import ( ) func main() { - authCmd := cmd.NewRpcCmd("auth") + authCmd := cmd.NewRpcCmd(cmd.RpcAuthServer) authCmd.AddPortFlag() authCmd.AddPrometheusPortFlag() if err := authCmd.Exec(); err != nil { diff --git a/cmd/openim-rpc/openim-rpc-conversation/main.go b/cmd/openim-rpc/openim-rpc-conversation/main.go index 80a92de92..13d7db605 100644 --- a/cmd/openim-rpc/openim-rpc-conversation/main.go +++ b/cmd/openim-rpc/openim-rpc-conversation/main.go @@ -21,7 +21,7 @@ import ( ) func main() { - rpcCmd := cmd.NewRpcCmd("conversation") + rpcCmd := cmd.NewRpcCmd(cmd.RpcConversationServer) rpcCmd.AddPortFlag() rpcCmd.AddPrometheusPortFlag() if err := rpcCmd.Exec(); err != nil { diff --git a/cmd/openim-rpc/openim-rpc-friend/main.go b/cmd/openim-rpc/openim-rpc-friend/main.go index c0c91e4dc..ec18306a2 100644 --- a/cmd/openim-rpc/openim-rpc-friend/main.go +++ b/cmd/openim-rpc/openim-rpc-friend/main.go @@ -21,7 +21,7 @@ import ( ) func main() { - rpcCmd := cmd.NewRpcCmd("friend") + rpcCmd := cmd.NewRpcCmd(cmd.RpcFriendServer) rpcCmd.AddPortFlag() rpcCmd.AddPrometheusPortFlag() if err := rpcCmd.Exec(); err != nil { diff --git a/cmd/openim-rpc/openim-rpc-group/main.go b/cmd/openim-rpc/openim-rpc-group/main.go index d2d4f727a..887329926 100644 --- a/cmd/openim-rpc/openim-rpc-group/main.go +++ b/cmd/openim-rpc/openim-rpc-group/main.go @@ -21,7 +21,7 @@ import ( ) func main() { - rpcCmd := cmd.NewRpcCmd("group") + rpcCmd := cmd.NewRpcCmd(cmd.RpcGroupServer) rpcCmd.AddPortFlag() rpcCmd.AddPrometheusPortFlag() if err := rpcCmd.Exec(); err != nil { diff --git a/cmd/openim-rpc/openim-rpc-msg/main.go b/cmd/openim-rpc/openim-rpc-msg/main.go index 0ac258d0c..dcc3abef5 100644 --- a/cmd/openim-rpc/openim-rpc-msg/main.go +++ b/cmd/openim-rpc/openim-rpc-msg/main.go @@ -21,7 +21,7 @@ import ( ) func main() { - rpcCmd := cmd.NewRpcCmd("msg") + rpcCmd := cmd.NewRpcCmd(cmd.RpcMsgServer) rpcCmd.AddPortFlag() rpcCmd.AddPrometheusPortFlag() if err := rpcCmd.Exec(); err != nil { diff --git a/cmd/openim-rpc/openim-rpc-third/main.go b/cmd/openim-rpc/openim-rpc-third/main.go index 913962b82..cf0bf4b70 100644 --- a/cmd/openim-rpc/openim-rpc-third/main.go +++ b/cmd/openim-rpc/openim-rpc-third/main.go @@ -21,7 +21,7 @@ import ( ) func main() { - rpcCmd := cmd.NewRpcCmd("third") + rpcCmd := cmd.NewRpcCmd(cmd.RpcThirdServer) rpcCmd.AddPortFlag() rpcCmd.AddPrometheusPortFlag() if err := rpcCmd.Exec(); err != nil { diff --git a/cmd/openim-rpc/openim-rpc-user/main.go b/cmd/openim-rpc/openim-rpc-user/main.go index f2ebc94db..cbf2a8fc3 100644 --- a/cmd/openim-rpc/openim-rpc-user/main.go +++ b/cmd/openim-rpc/openim-rpc-user/main.go @@ -21,7 +21,7 @@ import ( ) func main() { - rpcCmd := cmd.NewRpcCmd("user") + rpcCmd := cmd.NewRpcCmd(cmd.RpcUserServer) rpcCmd.AddPortFlag() rpcCmd.AddPrometheusPortFlag() if err := rpcCmd.Exec(); err != nil { diff --git a/config/config.yaml b/config/config.yaml index 9d6b3c335..c89fec549 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -26,6 +26,10 @@ # Zookeeper address # Zookeeper username # Zookeeper password +envs: + discovery: zookeeper #k8s + serverTag: develop + zookeeper: schema: openim address: [ 172.28.0.1:12181 ] @@ -173,15 +177,15 @@ rpcPort: ###################### RPC Register Name Configuration ###################### # RPC service names for registration, it's not recommended to modify these rpcRegisterName: - openImUserName: User - openImFriendName: Friend - openImMsgName: Msg - openImPushName: Push - openImMessageGatewayName: MessageGateway - openImGroupName: Group - openImAuthName: Auth - openImConversationName: Conversation - openImThirdName: Third + openImUserName: Rpc-User + openImFriendName: Rpc-Friend + openImMsgName: Rpc-Msg + openImPushName: Rpc-Push + openImMessageGatewayName: Rpc-MessageGateway + openImGroupName: Rpc-Group + openImAuthName: Rpc-Auth + openImConversationName: Rpc-Conversation + openImThirdName: Rpc-Third ###################### Log Configuration ###################### # Log configuration @@ -211,6 +215,7 @@ log: # Websocket connection handshake timeout longConnSvr: openImWsPort: [ 10001 ] + openImMessageGatewayPort: [ 10140 ] websocketMaxConnNum: 100000 websocketMaxMsgLen: 4096 websocketTimeout: 10 diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index db48ead70..4487826ee 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -16,14 +16,11 @@ package msgtransfer import ( "fmt" - "sync" - "time" - + "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "sync" - openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper" - "github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/mw" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -62,9 +59,11 @@ func StartTransfer(prometheusPort int) error { if err := mongo.CreateMsgIndex(); err != nil { return err } - client, err := openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, - openkeeper.WithFreq(time.Hour), openkeeper.WithRoundRobin(), openkeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username, - config.Config.Zookeeper.Password), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger())) + client, err := discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery) + /* + client, err := openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, + openkeeper.WithFreq(time.Hour), openkeeper.WithRoundRobin(), openkeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username, + config.Config.Zookeeper.Password), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger()))*/ if err != nil { return err } diff --git a/internal/tools/msg.go b/internal/tools/msg.go index 93f5c3a8a..5397689b2 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -17,14 +17,12 @@ package tools import ( "context" "fmt" - "math" - "time" - + "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" "github.com/redis/go-redis/v9" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "math" - "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper" "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/mcontext" @@ -74,9 +72,11 @@ func InitMsgTool() (*MsgTool, error) { if err != nil { return nil, err } - discov, err := zookeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, - zookeeper.WithFreq(time.Hour), zookeeper.WithRoundRobin(), zookeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username, - config.Config.Zookeeper.Password), zookeeper.WithTimeout(10), zookeeper.WithLogger(log.NewZkLogger())) + discov, err := discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery) + /* + discov, err := zookeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, + zookeeper.WithFreq(time.Hour), zookeeper.WithRoundRobin(), zookeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username, + config.Config.Zookeeper.Password), zookeeper.WithTimeout(10), zookeeper.WithLogger(log.NewZkLogger()))*/ if err != nil { return nil, err } diff --git a/pkg/common/cmd/api.go b/pkg/common/cmd/api.go index 4cb5e34fc..19b11d4b2 100644 --- a/pkg/common/cmd/api.go +++ b/pkg/common/cmd/api.go @@ -14,14 +14,21 @@ package cmd -import "github.com/spf13/cobra" +import ( + "fmt" + "github.com/OpenIMSDK/protocol/constant" + config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/spf13/cobra" +) type ApiCmd struct { *RootCmd } func NewApiCmd() *ApiCmd { - return &ApiCmd{NewRootCmd("api")} + ret := &ApiCmd{NewRootCmd("api")} + ret.SetRootCmdPt(ret) + return ret } func (a *ApiCmd) AddApi(f func(port int) error) { @@ -29,3 +36,11 @@ func (a *ApiCmd) AddApi(f func(port int) error) { return f(a.getPortFlag(cmd)) } } +func (a *ApiCmd) GetPortFromConfig(portType string) int { + fmt.Println("GetPortFromConfig:", portType) + if portType == constant.FlagPort { + return config2.Config.Api.OpenImApiPort[0] + } else { + return 0 + } +} diff --git a/pkg/common/cmd/constant.go b/pkg/common/cmd/constant.go new file mode 100644 index 000000000..835593bbe --- /dev/null +++ b/pkg/common/cmd/constant.go @@ -0,0 +1,12 @@ +package cmd + +const ( + RpcPushServer = "push" + RpcAuthServer = "auth" + RpcConversationServer = "conversation" + RpcFriendServer = "friend" + RpcGroupServer = "group" + RpcMsgServer = "msg" + RpcThirdServer = "third" + RpcUserServer = "user" +) diff --git a/pkg/common/cmd/cron_task.go b/pkg/common/cmd/cron_task.go index 25dc9aae4..1b0e796ac 100644 --- a/pkg/common/cmd/cron_task.go +++ b/pkg/common/cmd/cron_task.go @@ -21,7 +21,9 @@ type CronTaskCmd struct { } func NewCronTaskCmd() *CronTaskCmd { - return &CronTaskCmd{NewRootCmd("cronTask", WithCronTaskLogName())} + ret := &CronTaskCmd{NewRootCmd("cronTask", WithCronTaskLogName())} + ret.SetRootCmdPt(ret) + return ret } func (c *CronTaskCmd) addRunE(f func() error) { diff --git a/pkg/common/cmd/msg_gateway.go b/pkg/common/cmd/msg_gateway.go index 34a9f3b4e..c96bbd7af 100644 --- a/pkg/common/cmd/msg_gateway.go +++ b/pkg/common/cmd/msg_gateway.go @@ -16,6 +16,8 @@ package cmd import ( "github.com/openimsdk/open-im-server/v3/internal/msggateway" + config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" + //"github.com/openimsdk/open-im-server/internal/msggateway". "github.com/spf13/cobra" @@ -26,8 +28,10 @@ type MsgGatewayCmd struct { *RootCmd } -func NewMsgGatewayCmd() MsgGatewayCmd { - return MsgGatewayCmd{NewRootCmd("msgGateway")} +func NewMsgGatewayCmd() *MsgGatewayCmd { + ret := &MsgGatewayCmd{NewRootCmd("msgGateway")} + ret.SetRootCmdPt(ret) + return ret } func (m *MsgGatewayCmd) AddWsPortFlag() { @@ -36,6 +40,9 @@ func (m *MsgGatewayCmd) AddWsPortFlag() { func (m *MsgGatewayCmd) getWsPortFlag(cmd *cobra.Command) int { port, _ := cmd.Flags().GetInt(constant.FlagWsPort) + if port == 0 { + port = m.PortFromConfig(constant.FlagWsPort) + } return port } @@ -49,3 +56,14 @@ func (m *MsgGatewayCmd) Exec() error { m.addRunE() return m.Execute() } +func (m *MsgGatewayCmd) GetPortFromConfig(portType string) int { + if portType == constant.FlagWsPort { + return config2.Config.LongConnSvr.OpenImWsPort[0] + } else if portType == constant.FlagPort { + return config2.Config.LongConnSvr.OpenImMessageGatewayPort[0] + } else if portType == constant.FlagPrometheusPort { + return 0 + } else { + return 0 + } +} diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index ae67ee9f7..20349ebbb 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -24,8 +24,10 @@ type MsgTransferCmd struct { *RootCmd } -func NewMsgTransferCmd() MsgTransferCmd { - return MsgTransferCmd{NewRootCmd("msgTransfer")} +func NewMsgTransferCmd() *MsgTransferCmd { + ret := &MsgTransferCmd{NewRootCmd("msgTransfer")} + ret.SetRootCmdPt(ret) + return ret } func (m *MsgTransferCmd) addRunE() { diff --git a/pkg/common/cmd/root.go b/pkg/common/cmd/root.go index 508ef8377..6973da9d6 100644 --- a/pkg/common/cmd/root.go +++ b/pkg/common/cmd/root.go @@ -26,11 +26,15 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) +type RootCmdPt interface { + GetPortFromConfig(portType string) int +} type RootCmd struct { Command cobra.Command Name string port int prometheusPort int + cmdItf RootCmdPt } type CmdOpts struct { @@ -76,7 +80,9 @@ func NewRootCmd(name string, opts ...func(*CmdOpts)) (rootCmd *RootCmd) { rootCmd.addConfFlag() return rootCmd } - +func (r *RootCmd) SetRootCmdPt(cmdItf RootCmdPt) { + r.cmdItf = cmdItf +} func (r *RootCmd) addConfFlag() { r.Command.Flags().StringP(constant.FlagConf, "c", "", "Path to config file folder") } @@ -87,6 +93,9 @@ func (r *RootCmd) AddPortFlag() { func (r *RootCmd) getPortFlag(cmd *cobra.Command) int { port, _ := cmd.Flags().GetInt(constant.FlagPort) + if port == 0 { + port = r.PortFromConfig(constant.FlagPort) + } return port } @@ -100,6 +109,9 @@ func (r *RootCmd) AddPrometheusPortFlag() { func (r *RootCmd) getPrometheusPortFlag(cmd *cobra.Command) int { port, _ := cmd.Flags().GetInt(constant.FlagPrometheusPort) + if port == 0 { + port = r.PortFromConfig(constant.FlagPrometheusPort) + } return port } @@ -120,3 +132,12 @@ func (r *RootCmd) Execute() error { func (r *RootCmd) AddCommand(cmds ...*cobra.Command) { r.Command.AddCommand(cmds...) } + +func (r *RootCmd) GetPortFromConfig(portType string) int { + fmt.Println("RootCmd.GetPortFromConfig:", portType) + return 0 +} +func (r *RootCmd) PortFromConfig(portType string) int { + fmt.Println("PortFromConfig:", portType) + return r.cmdItf.GetPortFromConfig(portType) +} diff --git a/pkg/common/cmd/rpc.go b/pkg/common/cmd/rpc.go index a5fc1164c..65e668145 100644 --- a/pkg/common/cmd/rpc.go +++ b/pkg/common/cmd/rpc.go @@ -16,7 +16,8 @@ package cmd import ( "errors" - + "github.com/OpenIMSDK/protocol/constant" + config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/spf13/cobra" "google.golang.org/grpc" @@ -30,8 +31,9 @@ type RpcCmd struct { } func NewRpcCmd(name string) *RpcCmd { - authCmd := &RpcCmd{NewRootCmd(name)} - return authCmd + ret := &RpcCmd{NewRootCmd(name)} + ret.SetRootCmdPt(ret) + return ret } func (a *RpcCmd) Exec() error { @@ -51,3 +53,40 @@ func (a *RpcCmd) StartSvr( } return startrpc.Start(a.GetPortFlag(), name, a.GetPrometheusPortFlag(), rpcFn) } +func (a *RpcCmd) GetPortFromConfig(portType string) int { + switch a.Name { + case RpcPushServer: + if portType == constant.FlagPort { + return config2.Config.RpcPort.OpenImPushPort[0] + } + case RpcAuthServer: + if portType == constant.FlagPort { + return config2.Config.RpcPort.OpenImAuthPort[0] + } + case RpcConversationServer: + if portType == constant.FlagPort { + return config2.Config.RpcPort.OpenImConversationPort[0] + } + case RpcFriendServer: + if portType == constant.FlagPort { + return config2.Config.RpcPort.OpenImFriendPort[0] + } + case RpcGroupServer: + if portType == constant.FlagPort { + return config2.Config.RpcPort.OpenImGroupPort[0] + } + case RpcMsgServer: + if portType == constant.FlagPort { + return config2.Config.RpcPort.OpenImMessageGatewayPort[0] + } + case RpcThirdServer: + if portType == constant.FlagPort { + return config2.Config.RpcPort.OpenImThirdPort[0] + } + case RpcUserServer: + if portType == constant.FlagPort { + return config2.Config.RpcPort.OpenImUserPort[0] + } + } + return 0 +} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index c6dd41419..60e0085fe 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -46,6 +46,9 @@ type POfflinePush struct { } type configStruct struct { + Envs struct { + Discovery string `yaml:"discovery"` + } Zookeeper struct { Schema string `yaml:"schema"` ZkAddr []string `yaml:"address"` @@ -181,10 +184,11 @@ type configStruct struct { } `yaml:"log"` LongConnSvr struct { - OpenImWsPort []int `yaml:"openImWsPort"` - WebsocketMaxConnNum int `yaml:"websocketMaxConnNum"` - WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"` - WebsocketTimeout int `yaml:"websocketTimeout"` + OpenImMessageGatewayPort []int `yaml:"openImMessageGatewayPort"` + OpenImWsPort []int `yaml:"openImWsPort"` + WebsocketMaxConnNum int `yaml:"websocketMaxConnNum"` + WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"` + WebsocketTimeout int `yaml:"websocketTimeout"` } `yaml:"longConnSvr"` Push struct { diff --git a/pkg/common/discovery_register/k8s_discovery_register.go b/pkg/common/discovery_register/k8s_discovery_register.go new file mode 100644 index 000000000..06d8b2566 --- /dev/null +++ b/pkg/common/discovery_register/k8s_discovery_register.go @@ -0,0 +1,90 @@ +package discovery_register + +import ( + "context" + "errors" + "fmt" + "github.com/OpenIMSDK/tools/discoveryregistry" + openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper" + "github.com/OpenIMSDK/tools/log" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "google.golang.org/grpc" + "net" + "strconv" + "time" +) + +func NewDiscoveryRegister(envType string) (discoveryregistry.SvcDiscoveryRegistry, error) { + var client discoveryregistry.SvcDiscoveryRegistry + var err error + switch envType { + case "zookeeper": + client, err = openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, + openkeeper.WithFreq(time.Hour), openkeeper.WithUserNameAndPassword( + config.Config.Zookeeper.Username, + config.Config.Zookeeper.Password, + ), openkeeper.WithRoundRobin(), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger())) + case "k8s": + client, err = NewK8sDiscoveryRegister() + default: + client = nil + err = errors.New("envType not correct") + } + return client, err +} + +type K8sDR struct { + rpcRegisterAddr string +} + +func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) { + return &K8sDR{}, nil +} + +func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { + + cli.rpcRegisterAddr = net.JoinHostPort(host, strconv.Itoa(port)) + return nil +} +func (cli *K8sDR) UnRegister() error { + + return nil +} +func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error { + + return nil +} +func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error { + + return nil +} + +func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) { + + return nil, nil +} +func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { + + conn, err := grpc.DialContext(ctx, serviceName, grpc.WithInsecure()) + return []*grpc.ClientConn{conn}, err +} +func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + + return grpc.DialContext(ctx, serviceName, grpc.WithInsecure()) +} +func (cli *K8sDR) GetSelfConnTarget() string { + + return cli.rpcRegisterAddr +} +func (cli *K8sDR) AddOption(opts ...grpc.DialOption) { + +} +func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) { + conn.Close() +} + +// do not use this method for call rpc +func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn { + fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!") + return nil +} diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 3ef6e569d..19da8a586 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -16,19 +16,16 @@ package startrpc import ( "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" "net" "strconv" - "time" - - "github.com/openimsdk/open-im-server/v3/pkg/common/config" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "github.com/OpenIMSDK/tools/discoveryregistry" - "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper" - "github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/mw" "github.com/OpenIMSDK/tools/network" "github.com/OpenIMSDK/tools/prome" @@ -60,15 +57,17 @@ func Start( return err } defer listener.Close() - zkClient, err := zookeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, - zookeeper.WithFreq(time.Hour), zookeeper.WithUserNameAndPassword( - config.Config.Zookeeper.Username, - config.Config.Zookeeper.Password, - ), zookeeper.WithRoundRobin(), zookeeper.WithTimeout(10), zookeeper.WithLogger(log.NewZkLogger())) + zkClient, err := discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery) + /* + zkClient, err := zookeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, + zookeeper.WithFreq(time.Hour), zookeeper.WithUserNameAndPassword( + config.Config.Zookeeper.Username, + config.Config.Zookeeper.Password, + ), zookeeper.WithRoundRobin(), zookeeper.WithTimeout(10), zookeeper.WithLogger(log.NewZkLogger()))*/ if err != nil { return utils.Wrap1(err) } - defer zkClient.CloseZK() + //defer zkClient.CloseZK() zkClient.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) registerIP, err := network.GetRpcRegisterIP(config.Config.Rpc.RegisterIP) if err != nil {