diff --git a/cmd/openim-rpc/openim-rpc-auth/main.go b/cmd/openim-rpc/openim-rpc-auth/main.go index d00b2ad7d..768989327 100644 --- a/cmd/openim-rpc/openim-rpc-auth/main.go +++ b/cmd/openim-rpc/openim-rpc-auth/main.go @@ -21,11 +21,7 @@ import ( ) func main() { - authCmd := cmd.NewRpcCmd(cmd.RpcAuthServer, auth.Start) - authCmd.AddPortFlag() - authCmd.AddPrometheusPortFlag() - if err := authCmd.Exec(); err != nil { + if err := cmd.NewAuthRpcCmd(auth.Start).Exec(); err != nil { program.ExitWithError(err) - } -} \ No newline at end of file +} diff --git a/config/zookeeper.yml b/config/zookeeper.yml index cdd61626d..ec740af16 100644 --- a/config/zookeeper.yml +++ b/config/zookeeper.yml @@ -4,6 +4,7 @@ address: [ 172.28.0.1:12181 ] username: '' password: '' +env: zookeeper rpcRegisterName: User: User Friend: Friend diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index b78cb9b1e..36e973eda 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -16,10 +16,10 @@ package auth import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/open-im-server/v3/pkg/authverify" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" @@ -40,22 +40,22 @@ type authServer struct { authDatabase controller.AuthDatabase userRpcClient *rpcclient.UserRpcClient RegisterCenter discovery.SvcDiscoveryRegistry - config *config.GlobalConfig + config *cmd.AuthConfig } -func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build()) +func Start(ctx context.Context, config *cmd.AuthConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { + rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) if err != nil { return err } - userRpcClient := rpcclient.NewUserRpcClient(client, config.RpcRegisterName.OpenImUserName, &config.Manager, &config.IMAdmin) + userRpcClient := rpcclient.NewUserRpcClient(client, config.ZookeeperConfig.RpcRegisterName.User, &config.Manager, &config.IMAdmin) pbauth.RegisterAuthServer(server, &authServer{ userRpcClient: &userRpcClient, RegisterCenter: client, authDatabase: controller.NewAuthDatabase( cache.NewTokenCacheModel(rdb), - config.Secret, - config.TokenPolicy.Expire, + config.RpcConfig.Secret, + config.RpcConfig.TokenPolicy.Expire, ), config: config, }) diff --git a/pkg/common/cmd/constant.go b/pkg/common/cmd/constant.go index 0281b4f14..fe60caaad 100644 --- a/pkg/common/cmd/constant.go +++ b/pkg/common/cmd/constant.go @@ -28,3 +28,40 @@ const ( MsgGatewayServer = "msgGateway" MsgTransferServer = "msgTransfer" ) +const ( + FileName = "config.yaml" + NotificationFileName = "notification.yaml" + KafkaConfigFileName = "kafka.yml" + RedisConfigFileName = "redis.yml" + WebhooksConfigFileName = "webhooks.yml" + ZookeeperConfigFileName = "zookeeper.yml" + MongodbConfigFileName = "mongodb.yml" + MinioConfigFileName = "minio.yml" + LogConfigFileName = "log.yml" + OpenIMAPICfgFileName = "openim-api.yml" + OpenIMCronTaskCfgFileName = "openim-crontask.yml" + OpenIMMsgGatewayCfgFileName = "openim-msg-gateway.yml" + OpenIMMsgTransferCfgFileName = "openim-msg-transfer.yml" + OpenIMPushCfgFileName = "openim-push.yml" + OpenIMRPCAuthCfgFileName = "openim-rpc-auth.yml" + OpenIMRPCConversationCfgFileName = "openim-rpc-conversation.yml" + OpenIMRPCFriendCfgFileName = "openim-rpc-friend.yml" + OpenIMRPCGroupCfgFileName = "openim-rpc-group.yml" + OpenIMRPCMsgCfgFileName = "openim-rpc-msg.yml" + OpenIMRPCThirdCfgFileName = "openim-rpc-third.yml" + OpenIMRPCUserCfgFileName = "openim-rpc-user.yml" +) + +const ( + logEnvPrefix = "openim-log" + redisEnvPrefix = "openim-redis" + mongodbEnvPrefix = "openim-mongodb" + zoopkeeperEnvPrefix = "openim-zookeeper" + authEnvPrefix = "openim-auth" +) + +const ( + FlagConf = "config_folder_path" + + FlagTransferIndex = "index" +) diff --git a/pkg/common/cmd/root.go b/pkg/common/cmd/root.go index 071aecc9a..4b1da90bd 100644 --- a/pkg/common/cmd/root.go +++ b/pkg/common/cmd/root.go @@ -16,35 +16,38 @@ package cmd import ( "fmt" + "path/filepath" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/spf13/cobra" ) -type RootCmdPt interface { - GetPortFromConfig(portType string) int -} - type RootCmd struct { Command cobra.Command - Name string processName string port int prometheusPort int - cmdItf RootCmdPt - config *config.GlobalConfig + log *config2.Log + index int +} + +func (r *RootCmd) Index() int { + return r.index } -func (rc *RootCmd) Port() int { - return rc.port +func (r *RootCmd) Port() int { + return r.port } type CmdOpts struct { loggerPrefixName string + configMap map[string]StructEnvPrefix +} +type StructEnvPrefix struct { + EnvPrefix string + ConfigStruct any } func WithCronTaskLogName() func(*CmdOpts) { @@ -58,13 +61,17 @@ func WithLogName(logName string) func(*CmdOpts) { opts.loggerPrefixName = logName } } +func WithConfigMap(configMap map[string]StructEnvPrefix) func(*CmdOpts) { + return func(opts *CmdOpts) { + opts.configMap = configMap + } +} -func NewRootCmd(processName, name string, opts ...func(*CmdOpts)) *RootCmd { - rootCmd := &RootCmd{processName: processName, Name: name, config: config.NewGlobalConfig()} +func NewRootCmd(processName string, opts ...func(*CmdOpts)) *RootCmd { + rootCmd := &RootCmd{processName: processName} cmd := cobra.Command{ - Use: "Start openIM application", - Short: fmt.Sprintf(`Start %s `, name), - Long: fmt.Sprintf(`Start %s `, name), + Use: "Start openIM application", + Long: fmt.Sprintf(`Start %s `, processName), PersistentPreRunE: func(cmd *cobra.Command, args []string) error { return rootCmd.persistentPreRun(cmd, opts...) }, @@ -72,29 +79,39 @@ func NewRootCmd(processName, name string, opts ...func(*CmdOpts)) *RootCmd { SilenceErrors: true, } rootCmd.Command = cmd - rootCmd.addConfFlag() return rootCmd } -func (rc *RootCmd) persistentPreRun(cmd *cobra.Command, opts ...func(*CmdOpts)) error { - if err := rc.initializeConfiguration(cmd); err != nil { +func (r *RootCmd) persistentPreRun(cmd *cobra.Command, opts ...func(*CmdOpts)) error { + cmdOpts := r.applyOptions(opts...) + if err := r.initializeConfiguration(cmd, cmdOpts); err != nil { return err } - cmdOpts := rc.applyOptions(opts...) - - if err := rc.initializeLogger(cmdOpts); err != nil { + if err := r.initializeLogger(cmdOpts); err != nil { return errs.WrapMsg(err, "failed to initialize logger") } return nil } -func (rc *RootCmd) initializeConfiguration(cmd *cobra.Command) error { - return rc.getConfFromCmdAndInit(cmd) +func (r *RootCmd) initializeConfiguration(cmd *cobra.Command, opts *CmdOpts) error { + configDirectory, _, err := r.getFlag(cmd) + if err != nil { + return err + } + for configFileName, structEnvPrefix := range opts.configMap { + err := config2.LoadConfig(filepath.Join(configDirectory, configFileName), + structEnvPrefix.EnvPrefix, structEnvPrefix.ConfigStruct) + if err != nil { + return err + } + } + return config2.LoadConfig(filepath.Join(configDirectory, LogConfigFileName), + logEnvPrefix, r.log) } -func (rc *RootCmd) applyOptions(opts ...func(*CmdOpts)) *CmdOpts { +func (r *RootCmd) applyOptions(opts ...func(*CmdOpts)) *CmdOpts { cmdOpts := defaultCmdOpts() for _, opt := range opts { opt(cmdOpts) @@ -103,25 +120,23 @@ func (rc *RootCmd) applyOptions(opts ...func(*CmdOpts)) *CmdOpts { return cmdOpts } -func (rc *RootCmd) initializeLogger(cmdOpts *CmdOpts) error { - logConfig := rc.config.Log - +func (r *RootCmd) initializeLogger(cmdOpts *CmdOpts) error { err := log.InitFromConfig( cmdOpts.loggerPrefixName, - rc.processName, - logConfig.RemainLogLevel, - logConfig.IsStdout, - logConfig.IsJson, - logConfig.StorageLocation, - logConfig.RemainRotationCount, - logConfig.RotationTime, + r.processName, + r.log.RemainLogLevel, + r.log.IsStdout, + r.log.IsJson, + r.log.StorageLocation, + r.log.RemainRotationCount, + r.log.RotationTime, config2.Version, ) if err != nil { return errs.Wrap(err) } - return errs.Wrap(log.InitConsoleLogger(rc.processName, logConfig.RemainLogLevel, logConfig.IsJson, config2.Version)) + return errs.Wrap(log.InitConsoleLogger(r.processName, r.log.RemainLogLevel, r.log.IsJson, config2.Version)) } @@ -131,69 +146,21 @@ func defaultCmdOpts() *CmdOpts { } } -func (r *RootCmd) SetRootCmdPt(cmdItf RootCmdPt) { - r.cmdItf = cmdItf -} - -func (r *RootCmd) addConfFlag() { - r.Command.Flags().StringP(constant.FlagConf, "c", "", "path to config file folder") -} - -func (r *RootCmd) AddPortFlag() { - r.Command.Flags().IntP(constant.FlagPort, "p", 0, "server listen port") -} - -func (r *RootCmd) getPortFlag(cmd *cobra.Command) int { - port, err := cmd.Flags().GetInt(constant.FlagPort) +func (r *RootCmd) getFlag(cmd *cobra.Command) (string, int, error) { + r.Command.Flags().StringP(FlagConf, "c", "", "path of config directory") + configDirectory, err := cmd.Flags().GetString(FlagConf) if err != nil { - // Wrapping the error with additional context - return 0 - } - if port == 0 { - port = r.PortFromConfig(constant.FlagPort) + return "", 0, errs.Wrap(err) } - return port -} - -// // GetPortFlag returns the port flag. -func (r *RootCmd) GetPortFlag() int { - return r.port -} - -func (r *RootCmd) AddPrometheusPortFlag() { - r.Command.Flags().IntP(constant.FlagPrometheusPort, "", 0, "server prometheus listen port") -} - -func (r *RootCmd) getPrometheusPortFlag(cmd *cobra.Command) int { - port, err := cmd.Flags().GetInt(constant.FlagPrometheusPort) - if err != nil || port == 0 { - port = r.PortFromConfig(constant.FlagPrometheusPort) - if err != nil { - return 0 - } + r.Command.Flags().IntP(FlagTransferIndex, "i", 0, "process startup sequence number") + index, err := cmd.Flags().GetInt(FlagTransferIndex) + if err != nil { + return "", 0, errs.Wrap(err) } - return port -} - -func (r *RootCmd) GetPrometheusPortFlag() int { - return r.prometheusPort -} - -func (r *RootCmd) getConfFromCmdAndInit(cmdLines *cobra.Command) error { - configFolderPath, _ := cmdLines.Flags().GetString(constant.FlagConf) - return config2.InitConfig(r.config, configFolderPath) + r.index = index + return configDirectory, index, nil } func (r *RootCmd) Execute() error { return r.Command.Execute() } - -func (r *RootCmd) AddCommand(cmds ...*cobra.Command) { - r.Command.AddCommand(cmds...) -} - -func (r *RootCmd) PortFromConfig(portType string) int { - // Retrieve the port and cache it - port := r.cmdItf.GetPortFromConfig(portType) - return port -} diff --git a/pkg/common/cmd/rpc.go b/pkg/common/cmd/rpc.go index e550708ed..5c83f7d68 100644 --- a/pkg/common/cmd/rpc.go +++ b/pkg/common/cmd/rpc.go @@ -18,141 +18,46 @@ import ( "context" config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" - "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/system/program" - "github.com/spf13/cobra" "google.golang.org/grpc" ) -type rpcInitFuc func(ctx context.Context, config *config2.GlobalConfig, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error +type rpcInitFuc func(ctx context.Context, config *AuthConfig, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error -type RpcCmd struct { +type AuthRpcCmd struct { *RootCmd - RpcRegisterName string - initFunc rpcInitFuc - ctx context.Context -} - -func NewRpcCmd(name string, initFunc rpcInitFuc) *RpcCmd { - ret := &RpcCmd{RootCmd: NewRootCmd(program.GetProcessName(), name), initFunc: initFunc} + initFunc rpcInitFuc + ctx context.Context + configMap map[string]StructEnvPrefix + authConfig AuthConfig +} +type AuthConfig struct { + RpcConfig config2.Auth + RedisConfig config2.Redis + ZookeeperConfig config2.ZooKeeper +} + +func NewAuthRpcCmd(initFunc rpcInitFuc) *AuthRpcCmd { + var authConfig AuthConfig + ret := &AuthRpcCmd{initFunc: initFunc, authConfig: authConfig} + ret.configMap = map[string]StructEnvPrefix{ + OpenIMRPCAuthCfgFileName: {EnvPrefix: authEnvPrefix, ConfigStruct: &authConfig.RpcConfig}, + RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &authConfig.RedisConfig}, + ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &authConfig.ZookeeperConfig}, + } + ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config2.Version) - ret.addPreRun() - ret.addRunE() - ret.SetRootCmdPt(ret) + ret.RunE() return ret } -func (a *RpcCmd) addPreRun() { - a.Command.PreRun = func(cmd *cobra.Command, args []string) { - a.port = a.getPortFlag(cmd) - a.prometheusPort = a.getPrometheusPortFlag(cmd) - } -} - -func (a *RpcCmd) addRunE() { - a.Command.RunE = func(cmd *cobra.Command, args []string) error { - rpcRegisterName, err := a.GetRpcRegisterNameFromConfig() - if err != nil { - return err - } else { - return a.StartSvr(rpcRegisterName, a.initFunc) - } - } -} - -func (a *RpcCmd) Exec() error { +func (a *AuthRpcCmd) Exec() error { return a.Execute() } -func (a *RpcCmd) StartSvr(name string, rpcFn func(ctx context.Context, config *config2.GlobalConfig, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error) error { - if a.GetPortFlag() == 0 { - return errs.New("port is required").Wrap() - } - return startrpc.Start(a.ctx, a.GetPortFlag(), name, a.GetPrometheusPortFlag(), a.config, rpcFn) -} - -func (a *RpcCmd) GetPortFromConfig(portType string) int { - switch a.Name { - case RpcPushServer: - if portType == constant.FlagPort { - return a.config.RpcPort.OpenImPushPort[0] - } - if portType == constant.FlagPrometheusPort { - return a.config.Prometheus.PushPrometheusPort[0] - } - case RpcAuthServer: - if portType == constant.FlagPort { - return a.config.RpcPort.OpenImAuthPort[0] - } - if portType == constant.FlagPrometheusPort { - return a.config.Prometheus.AuthPrometheusPort[0] - } - case RpcConversationServer: - if portType == constant.FlagPort { - return a.config.RpcPort.OpenImConversationPort[0] - } - if portType == constant.FlagPrometheusPort { - return a.config.Prometheus.ConversationPrometheusPort[0] - } - case RpcFriendServer: - if portType == constant.FlagPort { - return a.config.RpcPort.OpenImFriendPort[0] - } - if portType == constant.FlagPrometheusPort { - return a.config.Prometheus.FriendPrometheusPort[0] - } - case RpcGroupServer: - if portType == constant.FlagPort { - return a.config.RpcPort.OpenImGroupPort[0] - } - if portType == constant.FlagPrometheusPort { - return a.config.Prometheus.GroupPrometheusPort[0] - } - case RpcMsgServer: - if portType == constant.FlagPort { - return a.config.RpcPort.OpenImMessagePort[0] - } - if portType == constant.FlagPrometheusPort { - return a.config.Prometheus.MessagePrometheusPort[0] - } - case RpcThirdServer: - if portType == constant.FlagPort { - return a.config.RpcPort.OpenImThirdPort[0] - } - if portType == constant.FlagPrometheusPort { - return a.config.Prometheus.ThirdPrometheusPort[0] - } - case RpcUserServer: - if portType == constant.FlagPort { - return a.config.RpcPort.OpenImUserPort[0] - } - if portType == constant.FlagPrometheusPort { - return a.config.Prometheus.UserPrometheusPort[0] - } - } - return 0 -} - -func (a *RpcCmd) GetRpcRegisterNameFromConfig() (string, error) { - switch a.Name { - case RpcPushServer: - return a.config.RpcRegisterName.OpenImPushName, nil - case RpcAuthServer: - return a.config.RpcRegisterName.OpenImAuthName, nil - case RpcConversationServer: - return a.config.RpcRegisterName.OpenImConversationName, nil - case RpcFriendServer: - return a.config.RpcRegisterName.OpenImFriendName, nil - case RpcGroupServer: - return a.config.RpcRegisterName.OpenImGroupName, nil - case RpcMsgServer: - return a.config.RpcRegisterName.OpenImMsgName, nil - case RpcThirdServer: - return a.config.RpcRegisterName.OpenImThirdName, nil - case RpcUserServer: - return a.config.RpcRegisterName.OpenImUserName, nil - } - return "", errs.New("unrecognized RPC server name", "rpcName", a.Name).Wrap() +func (a *AuthRpcCmd) RunE() error { + return startrpc.Start(a.ctx, &a.authConfig.ZookeeperConfig, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, + a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.Ports, + a.Index(), a.authConfig.ZookeeperConfig.RpcRegisterName.Auth, &a.authConfig, a.initFunc) } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 48f62a297..02446eb7c 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -37,8 +37,8 @@ type LocalCache struct { type Log struct { StorageLocation string `mapstructure:"storageLocation"` - RotationTime int `mapstructure:"rotationTime"` - RemainRotationCount int `mapstructure:"remainRotationCount"` + RotationTime uint `mapstructure:"rotationTime"` + RemainRotationCount uint `mapstructure:"remainRotationCount"` RemainLogLevel int `mapstructure:"remainLogLevel"` IsStdout bool `mapstructure:"isStdout"` IsJson bool `mapstructure:"isJson"` @@ -133,16 +133,18 @@ type Notification struct { ConversationSetPrivate NotificationConfig `mapstructure:"conversationSetPrivate"` } +type Prometheus struct { + Enable bool `mapstructure:"enable"` + Ports []int `mapstructure:"ports"` +} + type MsgGateway struct { RPC struct { RegisterIP string `mapstructure:"registerIP"` Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` - Prometheus struct { - Enable bool `mapstructure:"enable"` - Ports []int `mapstructure:"ports"` - } `mapstructure:"prometheus"` - ListenIP string `mapstructure:"listenIP"` + Prometheus Prometheus `mapstructure:"prometheus"` + ListenIP string `mapstructure:"listenIP"` LongConnSvr struct { Ports []int `mapstructure:"ports"` WebsocketMaxConnNum int `mapstructure:"websocketMaxConnNum"` @@ -153,11 +155,8 @@ type MsgGateway struct { } type MsgTransfer struct { - Prometheus struct { - Enable bool `mapstructure:"enable"` - Ports []int `mapstructure:"ports"` - } `mapstructure:"prometheus"` - MsgCacheTimeout int `mapstructure:"msgCacheTimeout"` + Prometheus Prometheus `mapstructure:"prometheus"` + MsgCacheTimeout int `mapstructure:"msgCacheTimeout"` } type Push struct { @@ -166,12 +165,9 @@ type Push struct { ListenIP string `mapstructure:"listenIP"` Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` - Prometheus struct { - Enable bool `mapstructure:"enable"` - Ports []int `mapstructure:"ports"` - } `mapstructure:"prometheus"` - Enable string `mapstructure:"enable"` - GeTui struct { + Prometheus Prometheus `mapstructure:"prometheus"` + Enable string `mapstructure:"enable"` + GeTui struct { PushUrl string `mapstructure:"pushUrl"` MasterSecret string `mapstructure:"masterSecret"` AppKey string `mapstructure:"appKey"` @@ -201,12 +197,9 @@ type Auth struct { ListenIP string `mapstructure:"listenIP"` Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` - Prometheus struct { - Enable bool `mapstructure:"enable"` - Ports []int `mapstructure:"ports"` - } `mapstructure:"prometheus"` + Prometheus Prometheus `mapstructure:"prometheus"` TokenPolicy struct { - Expire int `mapstructure:"expire"` + Expire int64 `mapstructure:"expire"` } `mapstructure:"tokenPolicy"` Secret string `mapstructure:"secret"` } @@ -217,10 +210,7 @@ type Conversation struct { ListenIP string `mapstructure:"listenIP"` Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` - Prometheus struct { - Enable bool `mapstructure:"enable"` - Ports []int `mapstructure:"ports"` - } `mapstructure:"prometheus"` + Prometheus Prometheus `mapstructure:"prometheus"` } type Friend struct { @@ -229,10 +219,7 @@ type Friend struct { ListenIP string `mapstructure:"listenIP"` Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` - Prometheus struct { - Enable bool `mapstructure:"enable"` - Ports []int `mapstructure:"ports"` - } `mapstructure:"prometheus"` + Prometheus Prometheus `mapstructure:"prometheus"` } type Group struct { @@ -241,10 +228,7 @@ type Group struct { ListenIP string `mapstructure:"listenIP"` Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` - Prometheus struct { - Enable bool `mapstructure:"enable"` - Ports []int `mapstructure:"ports"` - } `mapstructure:"prometheus"` + Prometheus Prometheus `mapstructure:"prometheus"` } type Msg struct { @@ -253,13 +237,10 @@ type Msg struct { ListenIP string `mapstructure:"listenIP"` Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` - Prometheus struct { - Enable bool `mapstructure:"enable"` - Ports []int `mapstructure:"ports"` - } `mapstructure:"prometheus"` - FriendVerify bool `mapstructure:"friendVerify"` - GroupMessageHasReadReceiptEnable bool `mapstructure:"groupMessageHasReadReceiptEnable"` - SingleMessageHasReadReceiptEnable bool `mapstructure:"singleMessageHasReadReceiptEnable"` + Prometheus Prometheus `mapstructure:"prometheus"` + FriendVerify bool `mapstructure:"friendVerify"` + GroupMessageHasReadReceiptEnable bool `mapstructure:"groupMessageHasReadReceiptEnable"` + SingleMessageHasReadReceiptEnable bool `mapstructure:"singleMessageHasReadReceiptEnable"` } type Third struct { @@ -268,11 +249,8 @@ type Third struct { ListenIP string `mapstructure:"listenIP"` Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` - Prometheus struct { - Enable bool `mapstructure:"enable"` - Ports []int `mapstructure:"ports"` - } `mapstructure:"prometheus"` - Object struct { + Prometheus Prometheus `mapstructure:"prometheus"` + Object struct { Enable string `mapstructure:"enable"` Cos struct { BucketURL string `mapstructure:"bucketURL"` @@ -316,10 +294,7 @@ type User struct { ListenIP string `mapstructure:"listenIP"` Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` - Prometheus struct { - Enable bool `mapstructure:"enable"` - Ports []int `mapstructure:"ports"` - } `mapstructure:"prometheus"` + Prometheus Prometheus `mapstructure:"prometheus"` } type Redis struct { @@ -394,6 +369,7 @@ type ZooKeeper struct { Address []string `mapstructure:"address"` Username string `mapstructure:"username"` Password string `mapstructure:"password"` + Env string `mapstructure:"env"` RpcRegisterName struct { User string `mapstructure:"User"` Friend string `mapstructure:"Friend"` diff --git a/pkg/common/config/load_config.go b/pkg/common/config/load_config.go index 247d36276..4101b0b25 100644 --- a/pkg/common/config/load_config.go +++ b/pkg/common/config/load_config.go @@ -2,25 +2,26 @@ package config import ( "github.com/mitchellh/mapstructure" + "github.com/openimsdk/tools/errs" "github.com/spf13/viper" "strings" ) -func LoadConfig(path string, prefix string, config any) error { +func LoadConfig(path string, envPrefix string, config any) error { v := viper.New() v.SetConfigFile(path) - v.SetEnvPrefix(prefix) + v.SetEnvPrefix(envPrefix) v.AutomaticEnv() v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) if err := v.ReadInConfig(); err != nil { - return err + return errs.WrapMsg(err, "failed to read config file", "path", path, "envPrefix", envPrefix) } if err := v.Unmarshal(config, func(config *mapstructure.DecoderConfig) { config.TagName = "mapstructure" }); err != nil { - return err + return errs.WrapMsg(err, "failed to unmarshal config", "path", path, "envPrefix", envPrefix) } return nil diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 3ac8710aa..1c6cb973a 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -21,14 +21,26 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" + "time" ) // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. -func NewDiscoveryRegister(config *config.GlobalConfig) (discovery.SvcDiscoveryRegistry, error) { +func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper) (discovery.SvcDiscoveryRegistry, error) { - switch config.Envs.Discovery { + switch zookeeperConfig.Env { case "zookeeper": return zookeeper.NewZookeeperDiscoveryRegister(&config.Zookeeper) + zk, err := zookeeper.NewZkClient( + zookeeperConfig.zkAddr, + schema, + zookeeper.WithFreq(time.Hour), + zookeeper.WithUserNameAndPassword(username, password), + zookeeper.WithRoundRobin(), + zookeeper.WithTimeout(10), + ) + if err != nil { + return nil, err + } case "k8s": return kubernetes.NewK8sDiscoveryRegister(config.RpcRegisterName.OpenImMessageGatewayName) case "direct": diff --git a/pkg/common/prommetrics/prommetrics.go b/pkg/common/prommetrics/prommetrics.go index 33d4c2b40..a562b3c26 100644 --- a/pkg/common/prommetrics/prommetrics.go +++ b/pkg/common/prommetrics/prommetrics.go @@ -31,17 +31,17 @@ func NewGrpcPromObj(cusMetrics []prometheus.Collector) (*prometheus.Registry, *g return reg, grpcMetrics, nil } -func GetGrpcCusMetrics(registerName string, rpcRegisterName *config2.RpcRegisterName) []prometheus.Collector { +func GetGrpcCusMetrics(registerName string, zookeeper *config2.ZooKeeper) []prometheus.Collector { switch registerName { - case rpcRegisterName.OpenImMessageGatewayName: + case zookeeper.RpcRegisterName.MessageGateway: return []prometheus.Collector{OnlineUserGauge} - case rpcRegisterName.OpenImMsgName: + case zookeeper.RpcRegisterName.Msg: return []prometheus.Collector{SingleChatMsgProcessSuccessCounter, SingleChatMsgProcessFailedCounter, GroupChatMsgProcessSuccessCounter, GroupChatMsgProcessFailedCounter} case "Transfer": return []prometheus.Collector{MsgInsertRedisSuccessCounter, MsgInsertRedisFailedCounter, MsgInsertMongoSuccessCounter, MsgInsertMongoFailedCounter, SeqSetFailedCounter} - case rpcRegisterName.OpenImPushName: + case zookeeper.RpcRegisterName.Push: return []prometheus.Collector{MsgOfflinePushFailedCounter} - case rpcRegisterName.OpenImAuthName: + case zookeeper.RpcRegisterName.Auth: return []prometheus.Collector{UserLoginCounter} default: return nil diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index d1a942cef..e8f1ba414 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -17,6 +17,8 @@ package startrpc import ( "context" "fmt" + config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/prometheus/client_golang/prometheus" "net" "net/http" "os" @@ -27,8 +29,6 @@ import ( "time" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/discovery" @@ -37,17 +37,27 @@ import ( "github.com/openimsdk/tools/mw" "github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/utils/network" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) // Start rpc server. -func Start(ctx context.Context, rpcPort int, rpcRegisterName string, prometheusPort int, config *config2.GlobalConfig, rpcFn func(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { +func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prometheusConfig *config2.Prometheus, listenIP, + registerIP string, rpcPorts []int, index int, rpcRegisterName string, config T, rpcFn func(ctx context.Context, + config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { + + rpcPort, err := getElemByIndex(rpcPorts, index) + if err != nil { + return err + } + prometheusPort, err := getElemByIndex(prometheusConfig.Ports, index) + if err != nil { + return err + } log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", rpcPort, "prometheusPort", prometheusPort) - rpcTcpAddr := net.JoinHostPort(network.GetListenIP(config.Rpc.ListenIP), strconv.Itoa(rpcPort)) + rpcTcpAddr := net.JoinHostPort(network.GetListenIP(listenIP), strconv.Itoa(rpcPort)) listener, err := net.Listen( "tcp", rpcTcpAddr, @@ -57,22 +67,22 @@ func Start(ctx context.Context, rpcPort int, rpcRegisterName string, prometheusP } defer listener.Close() - client, err := kdisc.NewDiscoveryRegister(config) + client, err := kdisc.NewDiscoveryRegister(zookeeperConfig) if err != nil { return err } defer client.Close() client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) - registerIP, err := network.GetRpcRegisterIP(config.Rpc.RegisterIP) + registerIP, err = network.GetRpcRegisterIP(registerIP) if err != nil { return err } var reg *prometheus.Registry var metric *grpcprometheus.ServerMetrics - if config.Prometheus.Enable { - cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, &config.RpcRegisterName) + if prometheusConfig.Enable { + cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, zookeeperConfig) reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics) options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), grpc.UnaryInterceptor(metric.UnaryServerInterceptor())) @@ -108,7 +118,7 @@ func Start(ctx context.Context, rpcPort int, rpcRegisterName string, prometheusP ) go func() { - if config.Prometheus.Enable && prometheusPort != 0 { + if prometheusConfig.Enable && prometheusPort != 0 { metric.InitializeMetrics(srv) // Create a HTTP server for prometheus. httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} @@ -163,3 +173,11 @@ func gracefulStopWithCtx(ctx context.Context, f func()) error { return nil } } + +func getElemByIndex(array []int, index int) (int, error) { + if index < 0 || index >= len(array) { + return 0, errs.New("index out of range", "index", index, "array", array).Wrap() + } + + return array[index], nil +}