Code adaptation k8s: service discovery and registration adaptation, configuration adaptation

pull/1145/head
lin.huang 2 years ago
parent 7a3c3d7939
commit 466da76647

@ -17,15 +17,13 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register"
"net" "net"
"strconv"
"time"
_ "net/http/pprof" _ "net/http/pprof"
"strconv"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/discoveryregistry"
openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
"github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/internal/api" "github.com/openimsdk/open-im-server/v3/internal/api"
@ -44,6 +42,7 @@ func main() {
} }
func run(port int) error { func run(port int) error {
fmt.Println("*****openimapi port:", port)
if port == 0 { if port == 0 {
return fmt.Errorf("port is empty") return fmt.Errorf("port is empty")
} }
@ -53,11 +52,13 @@ func run(port int) error {
} }
fmt.Println("api start init discov client") fmt.Println("api start init discov client")
var client discoveryregistry.SvcDiscoveryRegistry var client discoveryregistry.SvcDiscoveryRegistry
client, err = openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, client, err = discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery)
openkeeper.WithFreq(time.Hour), openkeeper.WithUserNameAndPassword( /*
config.Config.Zookeeper.Username, client, err = openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
config.Config.Zookeeper.Password, openkeeper.WithFreq(time.Hour), openkeeper.WithUserNameAndPassword(
), openkeeper.WithRoundRobin(), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger())) config.Config.Zookeeper.Username,
config.Config.Zookeeper.Password,
), openkeeper.WithRoundRobin(), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger()))*/
if err != nil { if err != nil {
return err return err
} }

@ -21,7 +21,7 @@ import (
) )
func main() { func main() {
pushCmd := cmd.NewRpcCmd("push") pushCmd := cmd.NewRpcCmd(cmd.RpcPushServer)
pushCmd.AddPortFlag() pushCmd.AddPortFlag()
pushCmd.AddPrometheusPortFlag() pushCmd.AddPrometheusPortFlag()
if err := pushCmd.Exec(); err != nil { if err := pushCmd.Exec(); err != nil {

@ -21,7 +21,7 @@ import (
) )
func main() { func main() {
authCmd := cmd.NewRpcCmd("auth") authCmd := cmd.NewRpcCmd(cmd.RpcAuthServer)
authCmd.AddPortFlag() authCmd.AddPortFlag()
authCmd.AddPrometheusPortFlag() authCmd.AddPrometheusPortFlag()
if err := authCmd.Exec(); err != nil { if err := authCmd.Exec(); err != nil {

@ -21,7 +21,7 @@ import (
) )
func main() { func main() {
rpcCmd := cmd.NewRpcCmd("conversation") rpcCmd := cmd.NewRpcCmd(cmd.RpcConversationServer)
rpcCmd.AddPortFlag() rpcCmd.AddPortFlag()
rpcCmd.AddPrometheusPortFlag() rpcCmd.AddPrometheusPortFlag()
if err := rpcCmd.Exec(); err != nil { if err := rpcCmd.Exec(); err != nil {

@ -21,7 +21,7 @@ import (
) )
func main() { func main() {
rpcCmd := cmd.NewRpcCmd("friend") rpcCmd := cmd.NewRpcCmd(cmd.RpcFriendServer)
rpcCmd.AddPortFlag() rpcCmd.AddPortFlag()
rpcCmd.AddPrometheusPortFlag() rpcCmd.AddPrometheusPortFlag()
if err := rpcCmd.Exec(); err != nil { if err := rpcCmd.Exec(); err != nil {

@ -21,7 +21,7 @@ import (
) )
func main() { func main() {
rpcCmd := cmd.NewRpcCmd("group") rpcCmd := cmd.NewRpcCmd(cmd.RpcGroupServer)
rpcCmd.AddPortFlag() rpcCmd.AddPortFlag()
rpcCmd.AddPrometheusPortFlag() rpcCmd.AddPrometheusPortFlag()
if err := rpcCmd.Exec(); err != nil { if err := rpcCmd.Exec(); err != nil {

@ -21,7 +21,7 @@ import (
) )
func main() { func main() {
rpcCmd := cmd.NewRpcCmd("msg") rpcCmd := cmd.NewRpcCmd(cmd.RpcMsgServer)
rpcCmd.AddPortFlag() rpcCmd.AddPortFlag()
rpcCmd.AddPrometheusPortFlag() rpcCmd.AddPrometheusPortFlag()
if err := rpcCmd.Exec(); err != nil { if err := rpcCmd.Exec(); err != nil {

@ -21,7 +21,7 @@ import (
) )
func main() { func main() {
rpcCmd := cmd.NewRpcCmd("third") rpcCmd := cmd.NewRpcCmd(cmd.RpcThirdServer)
rpcCmd.AddPortFlag() rpcCmd.AddPortFlag()
rpcCmd.AddPrometheusPortFlag() rpcCmd.AddPrometheusPortFlag()
if err := rpcCmd.Exec(); err != nil { if err := rpcCmd.Exec(); err != nil {

@ -21,7 +21,7 @@ import (
) )
func main() { func main() {
rpcCmd := cmd.NewRpcCmd("user") rpcCmd := cmd.NewRpcCmd(cmd.RpcUserServer)
rpcCmd.AddPortFlag() rpcCmd.AddPortFlag()
rpcCmd.AddPrometheusPortFlag() rpcCmd.AddPrometheusPortFlag()
if err := rpcCmd.Exec(); err != nil { if err := rpcCmd.Exec(); err != nil {

@ -26,6 +26,10 @@
# Zookeeper address # Zookeeper address
# Zookeeper username # Zookeeper username
# Zookeeper password # Zookeeper password
envs:
discovery: zookeeper #k8s
serverTag: develop
zookeeper: zookeeper:
schema: openim schema: openim
address: [ 172.28.0.1:12181 ] address: [ 172.28.0.1:12181 ]
@ -173,15 +177,15 @@ rpcPort:
###################### RPC Register Name Configuration ###################### ###################### RPC Register Name Configuration ######################
# RPC service names for registration, it's not recommended to modify these # RPC service names for registration, it's not recommended to modify these
rpcRegisterName: rpcRegisterName:
openImUserName: User openImUserName: Rpc-User
openImFriendName: Friend openImFriendName: Rpc-Friend
openImMsgName: Msg openImMsgName: Rpc-Msg
openImPushName: Push openImPushName: Rpc-Push
openImMessageGatewayName: MessageGateway openImMessageGatewayName: Rpc-MessageGateway
openImGroupName: Group openImGroupName: Rpc-Group
openImAuthName: Auth openImAuthName: Rpc-Auth
openImConversationName: Conversation openImConversationName: Rpc-Conversation
openImThirdName: Third openImThirdName: Rpc-Third
###################### Log Configuration ###################### ###################### Log Configuration ######################
# Log configuration # Log configuration
@ -211,6 +215,7 @@ log:
# Websocket connection handshake timeout # Websocket connection handshake timeout
longConnSvr: longConnSvr:
openImWsPort: [ 10001 ] openImWsPort: [ 10001 ]
openImMessageGatewayPort: [ 10140 ]
websocketMaxConnNum: 100000 websocketMaxConnNum: 100000
websocketMaxMsgLen: 4096 websocketMaxMsgLen: 4096
websocketTimeout: 10 websocketTimeout: 10

@ -16,14 +16,11 @@ package msgtransfer
import ( import (
"fmt" "fmt"
"sync" "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register"
"time"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"sync"
openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mw" "github.com/OpenIMSDK/tools/mw"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -62,9 +59,11 @@ func StartTransfer(prometheusPort int) error {
if err := mongo.CreateMsgIndex(); err != nil { if err := mongo.CreateMsgIndex(); err != nil {
return err return err
} }
client, err := openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, client, err := discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery)
openkeeper.WithFreq(time.Hour), openkeeper.WithRoundRobin(), openkeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username, /*
config.Config.Zookeeper.Password), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger())) client, err := openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
openkeeper.WithFreq(time.Hour), openkeeper.WithRoundRobin(), openkeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username,
config.Config.Zookeeper.Password), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger()))*/
if err != nil { if err != nil {
return err return err
} }

@ -17,14 +17,12 @@ package tools
import ( import (
"context" "context"
"fmt" "fmt"
"math" "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register"
"time"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"math"
"github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext" "github.com/OpenIMSDK/tools/mcontext"
@ -74,9 +72,11 @@ func InitMsgTool() (*MsgTool, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
discov, err := zookeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, discov, err := discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery)
zookeeper.WithFreq(time.Hour), zookeeper.WithRoundRobin(), zookeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username, /*
config.Config.Zookeeper.Password), zookeeper.WithTimeout(10), zookeeper.WithLogger(log.NewZkLogger())) discov, err := zookeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
zookeeper.WithFreq(time.Hour), zookeeper.WithRoundRobin(), zookeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username,
config.Config.Zookeeper.Password), zookeeper.WithTimeout(10), zookeeper.WithLogger(log.NewZkLogger()))*/
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -14,14 +14,21 @@
package cmd package cmd
import "github.com/spf13/cobra" import (
"fmt"
"github.com/OpenIMSDK/protocol/constant"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/spf13/cobra"
)
type ApiCmd struct { type ApiCmd struct {
*RootCmd *RootCmd
} }
func NewApiCmd() *ApiCmd { func NewApiCmd() *ApiCmd {
return &ApiCmd{NewRootCmd("api")} ret := &ApiCmd{NewRootCmd("api")}
ret.SetRootCmdPt(ret)
return ret
} }
func (a *ApiCmd) AddApi(f func(port int) error) { func (a *ApiCmd) AddApi(f func(port int) error) {
@ -29,3 +36,11 @@ func (a *ApiCmd) AddApi(f func(port int) error) {
return f(a.getPortFlag(cmd)) return f(a.getPortFlag(cmd))
} }
} }
func (a *ApiCmd) GetPortFromConfig(portType string) int {
fmt.Println("GetPortFromConfig:", portType)
if portType == constant.FlagPort {
return config2.Config.Api.OpenImApiPort[0]
} else {
return 0
}
}

@ -0,0 +1,12 @@
package cmd
const (
RpcPushServer = "push"
RpcAuthServer = "auth"
RpcConversationServer = "conversation"
RpcFriendServer = "friend"
RpcGroupServer = "group"
RpcMsgServer = "msg"
RpcThirdServer = "third"
RpcUserServer = "user"
)

@ -21,7 +21,9 @@ type CronTaskCmd struct {
} }
func NewCronTaskCmd() *CronTaskCmd { func NewCronTaskCmd() *CronTaskCmd {
return &CronTaskCmd{NewRootCmd("cronTask", WithCronTaskLogName())} ret := &CronTaskCmd{NewRootCmd("cronTask", WithCronTaskLogName())}
ret.SetRootCmdPt(ret)
return ret
} }
func (c *CronTaskCmd) addRunE(f func() error) { func (c *CronTaskCmd) addRunE(f func() error) {

@ -16,6 +16,8 @@ package cmd
import ( import (
"github.com/openimsdk/open-im-server/v3/internal/msggateway" "github.com/openimsdk/open-im-server/v3/internal/msggateway"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
//"github.com/openimsdk/open-im-server/internal/msggateway". //"github.com/openimsdk/open-im-server/internal/msggateway".
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -26,8 +28,10 @@ type MsgGatewayCmd struct {
*RootCmd *RootCmd
} }
func NewMsgGatewayCmd() MsgGatewayCmd { func NewMsgGatewayCmd() *MsgGatewayCmd {
return MsgGatewayCmd{NewRootCmd("msgGateway")} ret := &MsgGatewayCmd{NewRootCmd("msgGateway")}
ret.SetRootCmdPt(ret)
return ret
} }
func (m *MsgGatewayCmd) AddWsPortFlag() { func (m *MsgGatewayCmd) AddWsPortFlag() {
@ -36,6 +40,9 @@ func (m *MsgGatewayCmd) AddWsPortFlag() {
func (m *MsgGatewayCmd) getWsPortFlag(cmd *cobra.Command) int { func (m *MsgGatewayCmd) getWsPortFlag(cmd *cobra.Command) int {
port, _ := cmd.Flags().GetInt(constant.FlagWsPort) port, _ := cmd.Flags().GetInt(constant.FlagWsPort)
if port == 0 {
port = m.PortFromConfig(constant.FlagWsPort)
}
return port return port
} }
@ -49,3 +56,14 @@ func (m *MsgGatewayCmd) Exec() error {
m.addRunE() m.addRunE()
return m.Execute() return m.Execute()
} }
func (m *MsgGatewayCmd) GetPortFromConfig(portType string) int {
if portType == constant.FlagWsPort {
return config2.Config.LongConnSvr.OpenImWsPort[0]
} else if portType == constant.FlagPort {
return config2.Config.LongConnSvr.OpenImMessageGatewayPort[0]
} else if portType == constant.FlagPrometheusPort {
return 0
} else {
return 0
}
}

@ -24,8 +24,10 @@ type MsgTransferCmd struct {
*RootCmd *RootCmd
} }
func NewMsgTransferCmd() MsgTransferCmd { func NewMsgTransferCmd() *MsgTransferCmd {
return MsgTransferCmd{NewRootCmd("msgTransfer")} ret := &MsgTransferCmd{NewRootCmd("msgTransfer")}
ret.SetRootCmdPt(ret)
return ret
} }
func (m *MsgTransferCmd) addRunE() { func (m *MsgTransferCmd) addRunE() {

@ -26,11 +26,15 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
) )
type RootCmdPt interface {
GetPortFromConfig(portType string) int
}
type RootCmd struct { type RootCmd struct {
Command cobra.Command Command cobra.Command
Name string Name string
port int port int
prometheusPort int prometheusPort int
cmdItf RootCmdPt
} }
type CmdOpts struct { type CmdOpts struct {
@ -76,7 +80,9 @@ func NewRootCmd(name string, opts ...func(*CmdOpts)) (rootCmd *RootCmd) {
rootCmd.addConfFlag() rootCmd.addConfFlag()
return rootCmd return rootCmd
} }
func (r *RootCmd) SetRootCmdPt(cmdItf RootCmdPt) {
r.cmdItf = cmdItf
}
func (r *RootCmd) addConfFlag() { func (r *RootCmd) addConfFlag() {
r.Command.Flags().StringP(constant.FlagConf, "c", "", "Path to config file folder") r.Command.Flags().StringP(constant.FlagConf, "c", "", "Path to config file folder")
} }
@ -87,6 +93,9 @@ func (r *RootCmd) AddPortFlag() {
func (r *RootCmd) getPortFlag(cmd *cobra.Command) int { func (r *RootCmd) getPortFlag(cmd *cobra.Command) int {
port, _ := cmd.Flags().GetInt(constant.FlagPort) port, _ := cmd.Flags().GetInt(constant.FlagPort)
if port == 0 {
port = r.PortFromConfig(constant.FlagPort)
}
return port return port
} }
@ -100,6 +109,9 @@ func (r *RootCmd) AddPrometheusPortFlag() {
func (r *RootCmd) getPrometheusPortFlag(cmd *cobra.Command) int { func (r *RootCmd) getPrometheusPortFlag(cmd *cobra.Command) int {
port, _ := cmd.Flags().GetInt(constant.FlagPrometheusPort) port, _ := cmd.Flags().GetInt(constant.FlagPrometheusPort)
if port == 0 {
port = r.PortFromConfig(constant.FlagPrometheusPort)
}
return port return port
} }
@ -120,3 +132,12 @@ func (r *RootCmd) Execute() error {
func (r *RootCmd) AddCommand(cmds ...*cobra.Command) { func (r *RootCmd) AddCommand(cmds ...*cobra.Command) {
r.Command.AddCommand(cmds...) r.Command.AddCommand(cmds...)
} }
func (r *RootCmd) GetPortFromConfig(portType string) int {
fmt.Println("RootCmd.GetPortFromConfig:", portType)
return 0
}
func (r *RootCmd) PortFromConfig(portType string) int {
fmt.Println("PortFromConfig:", portType)
return r.cmdItf.GetPortFromConfig(portType)
}

@ -16,7 +16,8 @@ package cmd
import ( import (
"errors" "errors"
"github.com/OpenIMSDK/protocol/constant"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -30,8 +31,9 @@ type RpcCmd struct {
} }
func NewRpcCmd(name string) *RpcCmd { func NewRpcCmd(name string) *RpcCmd {
authCmd := &RpcCmd{NewRootCmd(name)} ret := &RpcCmd{NewRootCmd(name)}
return authCmd ret.SetRootCmdPt(ret)
return ret
} }
func (a *RpcCmd) Exec() error { func (a *RpcCmd) Exec() error {
@ -51,3 +53,40 @@ func (a *RpcCmd) StartSvr(
} }
return startrpc.Start(a.GetPortFlag(), name, a.GetPrometheusPortFlag(), rpcFn) return startrpc.Start(a.GetPortFlag(), name, a.GetPrometheusPortFlag(), rpcFn)
} }
func (a *RpcCmd) GetPortFromConfig(portType string) int {
switch a.Name {
case RpcPushServer:
if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImPushPort[0]
}
case RpcAuthServer:
if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImAuthPort[0]
}
case RpcConversationServer:
if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImConversationPort[0]
}
case RpcFriendServer:
if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImFriendPort[0]
}
case RpcGroupServer:
if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImGroupPort[0]
}
case RpcMsgServer:
if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImMessageGatewayPort[0]
}
case RpcThirdServer:
if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImThirdPort[0]
}
case RpcUserServer:
if portType == constant.FlagPort {
return config2.Config.RpcPort.OpenImUserPort[0]
}
}
return 0
}

@ -46,6 +46,9 @@ type POfflinePush struct {
} }
type configStruct struct { type configStruct struct {
Envs struct {
Discovery string `yaml:"discovery"`
}
Zookeeper struct { Zookeeper struct {
Schema string `yaml:"schema"` Schema string `yaml:"schema"`
ZkAddr []string `yaml:"address"` ZkAddr []string `yaml:"address"`
@ -181,10 +184,11 @@ type configStruct struct {
} `yaml:"log"` } `yaml:"log"`
LongConnSvr struct { LongConnSvr struct {
OpenImWsPort []int `yaml:"openImWsPort"` OpenImMessageGatewayPort []int `yaml:"openImMessageGatewayPort"`
WebsocketMaxConnNum int `yaml:"websocketMaxConnNum"` OpenImWsPort []int `yaml:"openImWsPort"`
WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"` WebsocketMaxConnNum int `yaml:"websocketMaxConnNum"`
WebsocketTimeout int `yaml:"websocketTimeout"` WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"`
WebsocketTimeout int `yaml:"websocketTimeout"`
} `yaml:"longConnSvr"` } `yaml:"longConnSvr"`
Push struct { Push struct {

@ -0,0 +1,90 @@
package discovery_register
import (
"context"
"errors"
"fmt"
"github.com/OpenIMSDK/tools/discoveryregistry"
openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"google.golang.org/grpc"
"net"
"strconv"
"time"
)
func NewDiscoveryRegister(envType string) (discoveryregistry.SvcDiscoveryRegistry, error) {
var client discoveryregistry.SvcDiscoveryRegistry
var err error
switch envType {
case "zookeeper":
client, err = openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
openkeeper.WithFreq(time.Hour), openkeeper.WithUserNameAndPassword(
config.Config.Zookeeper.Username,
config.Config.Zookeeper.Password,
), openkeeper.WithRoundRobin(), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger()))
case "k8s":
client, err = NewK8sDiscoveryRegister()
default:
client = nil
err = errors.New("envType not correct")
}
return client, err
}
type K8sDR struct {
rpcRegisterAddr string
}
func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
return &K8sDR{}, nil
}
func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
cli.rpcRegisterAddr = net.JoinHostPort(host, strconv.Itoa(port))
return nil
}
func (cli *K8sDR) UnRegister() error {
return nil
}
func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error {
return nil
}
func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error {
return nil
}
func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) {
return nil, nil
}
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
conn, err := grpc.DialContext(ctx, serviceName, grpc.WithInsecure())
return []*grpc.ClientConn{conn}, err
}
func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, serviceName, grpc.WithInsecure())
}
func (cli *K8sDR) GetSelfConnTarget() string {
return cli.rpcRegisterAddr
}
func (cli *K8sDR) AddOption(opts ...grpc.DialOption) {
}
func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) {
conn.Close()
}
// do not use this method for call rpc
func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!")
return nil
}

@ -16,19 +16,16 @@ package startrpc
import ( import (
"fmt" "fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register"
"net" "net"
"strconv" "strconv"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mw" "github.com/OpenIMSDK/tools/mw"
"github.com/OpenIMSDK/tools/network" "github.com/OpenIMSDK/tools/network"
"github.com/OpenIMSDK/tools/prome" "github.com/OpenIMSDK/tools/prome"
@ -60,15 +57,17 @@ func Start(
return err return err
} }
defer listener.Close() defer listener.Close()
zkClient, err := zookeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, zkClient, err := discovery_register.NewDiscoveryRegister(config.Config.Envs.Discovery)
zookeeper.WithFreq(time.Hour), zookeeper.WithUserNameAndPassword( /*
config.Config.Zookeeper.Username, zkClient, err := zookeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
config.Config.Zookeeper.Password, zookeeper.WithFreq(time.Hour), zookeeper.WithUserNameAndPassword(
), zookeeper.WithRoundRobin(), zookeeper.WithTimeout(10), zookeeper.WithLogger(log.NewZkLogger())) config.Config.Zookeeper.Username,
config.Config.Zookeeper.Password,
), zookeeper.WithRoundRobin(), zookeeper.WithTimeout(10), zookeeper.WithLogger(log.NewZkLogger()))*/
if err != nil { if err != nil {
return utils.Wrap1(err) return utils.Wrap1(err)
} }
defer zkClient.CloseZK() //defer zkClient.CloseZK()
zkClient.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) zkClient.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
registerIP, err := network.GetRpcRegisterIP(config.Config.Rpc.RegisterIP) registerIP, err := network.GetRpcRegisterIP(config.Config.Rpc.RegisterIP)
if err != nil { if err != nil {

Loading…
Cancel
Save