refactor: webhooks update.

pull/2148/head
Gordon 2 years ago
parent 60dd5004b5
commit 38c6ff7e7a

@ -21,11 +21,7 @@ import (
)
func main() {
authCmd := cmd.NewRpcCmd(cmd.RpcAuthServer, auth.Start)
authCmd.AddPortFlag()
authCmd.AddPrometheusPortFlag()
if err := authCmd.Exec(); err != nil {
if err := cmd.NewAuthRpcCmd(auth.Start).Exec(); err != nil {
program.ExitWithError(err)
}
}
}

@ -4,6 +4,7 @@ address: [ 172.28.0.1:12181 ]
username: ''
password: ''
env: zookeeper
rpcRegisterName:
User: User
Friend: Friend

@ -16,10 +16,10 @@ package auth
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
@ -40,22 +40,22 @@ type authServer struct {
authDatabase controller.AuthDatabase
userRpcClient *rpcclient.UserRpcClient
RegisterCenter discovery.SvcDiscoveryRegistry
config *config.GlobalConfig
config *cmd.AuthConfig
}
func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build())
func Start(ctx context.Context, config *cmd.AuthConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
if err != nil {
return err
}
userRpcClient := rpcclient.NewUserRpcClient(client, config.RpcRegisterName.OpenImUserName, &config.Manager, &config.IMAdmin)
userRpcClient := rpcclient.NewUserRpcClient(client, config.ZookeeperConfig.RpcRegisterName.User, &config.Manager, &config.IMAdmin)
pbauth.RegisterAuthServer(server, &authServer{
userRpcClient: &userRpcClient,
RegisterCenter: client,
authDatabase: controller.NewAuthDatabase(
cache.NewTokenCacheModel(rdb),
config.Secret,
config.TokenPolicy.Expire,
config.RpcConfig.Secret,
config.RpcConfig.TokenPolicy.Expire,
),
config: config,
})

@ -28,3 +28,40 @@ const (
MsgGatewayServer = "msgGateway"
MsgTransferServer = "msgTransfer"
)
const (
FileName = "config.yaml"
NotificationFileName = "notification.yaml"
KafkaConfigFileName = "kafka.yml"
RedisConfigFileName = "redis.yml"
WebhooksConfigFileName = "webhooks.yml"
ZookeeperConfigFileName = "zookeeper.yml"
MongodbConfigFileName = "mongodb.yml"
MinioConfigFileName = "minio.yml"
LogConfigFileName = "log.yml"
OpenIMAPICfgFileName = "openim-api.yml"
OpenIMCronTaskCfgFileName = "openim-crontask.yml"
OpenIMMsgGatewayCfgFileName = "openim-msg-gateway.yml"
OpenIMMsgTransferCfgFileName = "openim-msg-transfer.yml"
OpenIMPushCfgFileName = "openim-push.yml"
OpenIMRPCAuthCfgFileName = "openim-rpc-auth.yml"
OpenIMRPCConversationCfgFileName = "openim-rpc-conversation.yml"
OpenIMRPCFriendCfgFileName = "openim-rpc-friend.yml"
OpenIMRPCGroupCfgFileName = "openim-rpc-group.yml"
OpenIMRPCMsgCfgFileName = "openim-rpc-msg.yml"
OpenIMRPCThirdCfgFileName = "openim-rpc-third.yml"
OpenIMRPCUserCfgFileName = "openim-rpc-user.yml"
)
const (
logEnvPrefix = "openim-log"
redisEnvPrefix = "openim-redis"
mongodbEnvPrefix = "openim-mongodb"
zoopkeeperEnvPrefix = "openim-zookeeper"
authEnvPrefix = "openim-auth"
)
const (
FlagConf = "config_folder_path"
FlagTransferIndex = "index"
)

@ -16,35 +16,38 @@ package cmd
import (
"fmt"
"path/filepath"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/spf13/cobra"
)
type RootCmdPt interface {
GetPortFromConfig(portType string) int
}
type RootCmd struct {
Command cobra.Command
Name string
processName string
port int
prometheusPort int
cmdItf RootCmdPt
config *config.GlobalConfig
log *config2.Log
index int
}
func (r *RootCmd) Index() int {
return r.index
}
func (rc *RootCmd) Port() int {
return rc.port
func (r *RootCmd) Port() int {
return r.port
}
type CmdOpts struct {
loggerPrefixName string
configMap map[string]StructEnvPrefix
}
type StructEnvPrefix struct {
EnvPrefix string
ConfigStruct any
}
func WithCronTaskLogName() func(*CmdOpts) {
@ -58,13 +61,17 @@ func WithLogName(logName string) func(*CmdOpts) {
opts.loggerPrefixName = logName
}
}
func WithConfigMap(configMap map[string]StructEnvPrefix) func(*CmdOpts) {
return func(opts *CmdOpts) {
opts.configMap = configMap
}
}
func NewRootCmd(processName, name string, opts ...func(*CmdOpts)) *RootCmd {
rootCmd := &RootCmd{processName: processName, Name: name, config: config.NewGlobalConfig()}
func NewRootCmd(processName string, opts ...func(*CmdOpts)) *RootCmd {
rootCmd := &RootCmd{processName: processName}
cmd := cobra.Command{
Use: "Start openIM application",
Short: fmt.Sprintf(`Start %s `, name),
Long: fmt.Sprintf(`Start %s `, name),
Use: "Start openIM application",
Long: fmt.Sprintf(`Start %s `, processName),
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
return rootCmd.persistentPreRun(cmd, opts...)
},
@ -72,29 +79,39 @@ func NewRootCmd(processName, name string, opts ...func(*CmdOpts)) *RootCmd {
SilenceErrors: true,
}
rootCmd.Command = cmd
rootCmd.addConfFlag()
return rootCmd
}
func (rc *RootCmd) persistentPreRun(cmd *cobra.Command, opts ...func(*CmdOpts)) error {
if err := rc.initializeConfiguration(cmd); err != nil {
func (r *RootCmd) persistentPreRun(cmd *cobra.Command, opts ...func(*CmdOpts)) error {
cmdOpts := r.applyOptions(opts...)
if err := r.initializeConfiguration(cmd, cmdOpts); err != nil {
return err
}
cmdOpts := rc.applyOptions(opts...)
if err := rc.initializeLogger(cmdOpts); err != nil {
if err := r.initializeLogger(cmdOpts); err != nil {
return errs.WrapMsg(err, "failed to initialize logger")
}
return nil
}
func (rc *RootCmd) initializeConfiguration(cmd *cobra.Command) error {
return rc.getConfFromCmdAndInit(cmd)
func (r *RootCmd) initializeConfiguration(cmd *cobra.Command, opts *CmdOpts) error {
configDirectory, _, err := r.getFlag(cmd)
if err != nil {
return err
}
for configFileName, structEnvPrefix := range opts.configMap {
err := config2.LoadConfig(filepath.Join(configDirectory, configFileName),
structEnvPrefix.EnvPrefix, structEnvPrefix.ConfigStruct)
if err != nil {
return err
}
}
return config2.LoadConfig(filepath.Join(configDirectory, LogConfigFileName),
logEnvPrefix, r.log)
}
func (rc *RootCmd) applyOptions(opts ...func(*CmdOpts)) *CmdOpts {
func (r *RootCmd) applyOptions(opts ...func(*CmdOpts)) *CmdOpts {
cmdOpts := defaultCmdOpts()
for _, opt := range opts {
opt(cmdOpts)
@ -103,25 +120,23 @@ func (rc *RootCmd) applyOptions(opts ...func(*CmdOpts)) *CmdOpts {
return cmdOpts
}
func (rc *RootCmd) initializeLogger(cmdOpts *CmdOpts) error {
logConfig := rc.config.Log
func (r *RootCmd) initializeLogger(cmdOpts *CmdOpts) error {
err := log.InitFromConfig(
cmdOpts.loggerPrefixName,
rc.processName,
logConfig.RemainLogLevel,
logConfig.IsStdout,
logConfig.IsJson,
logConfig.StorageLocation,
logConfig.RemainRotationCount,
logConfig.RotationTime,
r.processName,
r.log.RemainLogLevel,
r.log.IsStdout,
r.log.IsJson,
r.log.StorageLocation,
r.log.RemainRotationCount,
r.log.RotationTime,
config2.Version,
)
if err != nil {
return errs.Wrap(err)
}
return errs.Wrap(log.InitConsoleLogger(rc.processName, logConfig.RemainLogLevel, logConfig.IsJson, config2.Version))
return errs.Wrap(log.InitConsoleLogger(r.processName, r.log.RemainLogLevel, r.log.IsJson, config2.Version))
}
@ -131,69 +146,21 @@ func defaultCmdOpts() *CmdOpts {
}
}
func (r *RootCmd) SetRootCmdPt(cmdItf RootCmdPt) {
r.cmdItf = cmdItf
}
func (r *RootCmd) addConfFlag() {
r.Command.Flags().StringP(constant.FlagConf, "c", "", "path to config file folder")
}
func (r *RootCmd) AddPortFlag() {
r.Command.Flags().IntP(constant.FlagPort, "p", 0, "server listen port")
}
func (r *RootCmd) getPortFlag(cmd *cobra.Command) int {
port, err := cmd.Flags().GetInt(constant.FlagPort)
func (r *RootCmd) getFlag(cmd *cobra.Command) (string, int, error) {
r.Command.Flags().StringP(FlagConf, "c", "", "path of config directory")
configDirectory, err := cmd.Flags().GetString(FlagConf)
if err != nil {
// Wrapping the error with additional context
return 0
}
if port == 0 {
port = r.PortFromConfig(constant.FlagPort)
return "", 0, errs.Wrap(err)
}
return port
}
// // GetPortFlag returns the port flag.
func (r *RootCmd) GetPortFlag() int {
return r.port
}
func (r *RootCmd) AddPrometheusPortFlag() {
r.Command.Flags().IntP(constant.FlagPrometheusPort, "", 0, "server prometheus listen port")
}
func (r *RootCmd) getPrometheusPortFlag(cmd *cobra.Command) int {
port, err := cmd.Flags().GetInt(constant.FlagPrometheusPort)
if err != nil || port == 0 {
port = r.PortFromConfig(constant.FlagPrometheusPort)
if err != nil {
return 0
}
r.Command.Flags().IntP(FlagTransferIndex, "i", 0, "process startup sequence number")
index, err := cmd.Flags().GetInt(FlagTransferIndex)
if err != nil {
return "", 0, errs.Wrap(err)
}
return port
}
func (r *RootCmd) GetPrometheusPortFlag() int {
return r.prometheusPort
}
func (r *RootCmd) getConfFromCmdAndInit(cmdLines *cobra.Command) error {
configFolderPath, _ := cmdLines.Flags().GetString(constant.FlagConf)
return config2.InitConfig(r.config, configFolderPath)
r.index = index
return configDirectory, index, nil
}
func (r *RootCmd) Execute() error {
return r.Command.Execute()
}
func (r *RootCmd) AddCommand(cmds ...*cobra.Command) {
r.Command.AddCommand(cmds...)
}
func (r *RootCmd) PortFromConfig(portType string) int {
// Retrieve the port and cache it
port := r.cmdItf.GetPortFromConfig(portType)
return port
}

@ -18,141 +18,46 @@ import (
"context"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
"google.golang.org/grpc"
)
type rpcInitFuc func(ctx context.Context, config *config2.GlobalConfig, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error
type rpcInitFuc func(ctx context.Context, config *AuthConfig, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error
type RpcCmd struct {
type AuthRpcCmd struct {
*RootCmd
RpcRegisterName string
initFunc rpcInitFuc
ctx context.Context
}
func NewRpcCmd(name string, initFunc rpcInitFuc) *RpcCmd {
ret := &RpcCmd{RootCmd: NewRootCmd(program.GetProcessName(), name), initFunc: initFunc}
initFunc rpcInitFuc
ctx context.Context
configMap map[string]StructEnvPrefix
authConfig AuthConfig
}
type AuthConfig struct {
RpcConfig config2.Auth
RedisConfig config2.Redis
ZookeeperConfig config2.ZooKeeper
}
func NewAuthRpcCmd(initFunc rpcInitFuc) *AuthRpcCmd {
var authConfig AuthConfig
ret := &AuthRpcCmd{initFunc: initFunc, authConfig: authConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMRPCAuthCfgFileName: {EnvPrefix: authEnvPrefix, ConfigStruct: &authConfig.RpcConfig},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &authConfig.RedisConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &authConfig.ZookeeperConfig},
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config2.Version)
ret.addPreRun()
ret.addRunE()
ret.SetRootCmdPt(ret)
ret.RunE()
return ret
}
func (a *RpcCmd) addPreRun() {
a.Command.PreRun = func(cmd *cobra.Command, args []string) {
a.port = a.getPortFlag(cmd)
a.prometheusPort = a.getPrometheusPortFlag(cmd)
}
}
func (a *RpcCmd) addRunE() {
a.Command.RunE = func(cmd *cobra.Command, args []string) error {
rpcRegisterName, err := a.GetRpcRegisterNameFromConfig()
if err != nil {
return err
} else {
return a.StartSvr(rpcRegisterName, a.initFunc)
}
}
}
func (a *RpcCmd) Exec() error {
func (a *AuthRpcCmd) Exec() error {
return a.Execute()
}
func (a *RpcCmd) StartSvr(name string, rpcFn func(ctx context.Context, config *config2.GlobalConfig, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error) error {
if a.GetPortFlag() == 0 {
return errs.New("port is required").Wrap()
}
return startrpc.Start(a.ctx, a.GetPortFlag(), name, a.GetPrometheusPortFlag(), a.config, rpcFn)
}
func (a *RpcCmd) GetPortFromConfig(portType string) int {
switch a.Name {
case RpcPushServer:
if portType == constant.FlagPort {
return a.config.RpcPort.OpenImPushPort[0]
}
if portType == constant.FlagPrometheusPort {
return a.config.Prometheus.PushPrometheusPort[0]
}
case RpcAuthServer:
if portType == constant.FlagPort {
return a.config.RpcPort.OpenImAuthPort[0]
}
if portType == constant.FlagPrometheusPort {
return a.config.Prometheus.AuthPrometheusPort[0]
}
case RpcConversationServer:
if portType == constant.FlagPort {
return a.config.RpcPort.OpenImConversationPort[0]
}
if portType == constant.FlagPrometheusPort {
return a.config.Prometheus.ConversationPrometheusPort[0]
}
case RpcFriendServer:
if portType == constant.FlagPort {
return a.config.RpcPort.OpenImFriendPort[0]
}
if portType == constant.FlagPrometheusPort {
return a.config.Prometheus.FriendPrometheusPort[0]
}
case RpcGroupServer:
if portType == constant.FlagPort {
return a.config.RpcPort.OpenImGroupPort[0]
}
if portType == constant.FlagPrometheusPort {
return a.config.Prometheus.GroupPrometheusPort[0]
}
case RpcMsgServer:
if portType == constant.FlagPort {
return a.config.RpcPort.OpenImMessagePort[0]
}
if portType == constant.FlagPrometheusPort {
return a.config.Prometheus.MessagePrometheusPort[0]
}
case RpcThirdServer:
if portType == constant.FlagPort {
return a.config.RpcPort.OpenImThirdPort[0]
}
if portType == constant.FlagPrometheusPort {
return a.config.Prometheus.ThirdPrometheusPort[0]
}
case RpcUserServer:
if portType == constant.FlagPort {
return a.config.RpcPort.OpenImUserPort[0]
}
if portType == constant.FlagPrometheusPort {
return a.config.Prometheus.UserPrometheusPort[0]
}
}
return 0
}
func (a *RpcCmd) GetRpcRegisterNameFromConfig() (string, error) {
switch a.Name {
case RpcPushServer:
return a.config.RpcRegisterName.OpenImPushName, nil
case RpcAuthServer:
return a.config.RpcRegisterName.OpenImAuthName, nil
case RpcConversationServer:
return a.config.RpcRegisterName.OpenImConversationName, nil
case RpcFriendServer:
return a.config.RpcRegisterName.OpenImFriendName, nil
case RpcGroupServer:
return a.config.RpcRegisterName.OpenImGroupName, nil
case RpcMsgServer:
return a.config.RpcRegisterName.OpenImMsgName, nil
case RpcThirdServer:
return a.config.RpcRegisterName.OpenImThirdName, nil
case RpcUserServer:
return a.config.RpcRegisterName.OpenImUserName, nil
}
return "", errs.New("unrecognized RPC server name", "rpcName", a.Name).Wrap()
func (a *AuthRpcCmd) RunE() error {
return startrpc.Start(a.ctx, &a.authConfig.ZookeeperConfig, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP,
a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.Ports,
a.Index(), a.authConfig.ZookeeperConfig.RpcRegisterName.Auth, &a.authConfig, a.initFunc)
}

@ -37,8 +37,8 @@ type LocalCache struct {
type Log struct {
StorageLocation string `mapstructure:"storageLocation"`
RotationTime int `mapstructure:"rotationTime"`
RemainRotationCount int `mapstructure:"remainRotationCount"`
RotationTime uint `mapstructure:"rotationTime"`
RemainRotationCount uint `mapstructure:"remainRotationCount"`
RemainLogLevel int `mapstructure:"remainLogLevel"`
IsStdout bool `mapstructure:"isStdout"`
IsJson bool `mapstructure:"isJson"`
@ -133,16 +133,18 @@ type Notification struct {
ConversationSetPrivate NotificationConfig `mapstructure:"conversationSetPrivate"`
}
type Prometheus struct {
Enable bool `mapstructure:"enable"`
Ports []int `mapstructure:"ports"`
}
type MsgGateway struct {
RPC struct {
RegisterIP string `mapstructure:"registerIP"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus struct {
Enable bool `mapstructure:"enable"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"prometheus"`
ListenIP string `mapstructure:"listenIP"`
Prometheus Prometheus `mapstructure:"prometheus"`
ListenIP string `mapstructure:"listenIP"`
LongConnSvr struct {
Ports []int `mapstructure:"ports"`
WebsocketMaxConnNum int `mapstructure:"websocketMaxConnNum"`
@ -153,11 +155,8 @@ type MsgGateway struct {
}
type MsgTransfer struct {
Prometheus struct {
Enable bool `mapstructure:"enable"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"prometheus"`
MsgCacheTimeout int `mapstructure:"msgCacheTimeout"`
Prometheus Prometheus `mapstructure:"prometheus"`
MsgCacheTimeout int `mapstructure:"msgCacheTimeout"`
}
type Push struct {
@ -166,12 +165,9 @@ type Push struct {
ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus struct {
Enable bool `mapstructure:"enable"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"prometheus"`
Enable string `mapstructure:"enable"`
GeTui struct {
Prometheus Prometheus `mapstructure:"prometheus"`
Enable string `mapstructure:"enable"`
GeTui struct {
PushUrl string `mapstructure:"pushUrl"`
MasterSecret string `mapstructure:"masterSecret"`
AppKey string `mapstructure:"appKey"`
@ -201,12 +197,9 @@ type Auth struct {
ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus struct {
Enable bool `mapstructure:"enable"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"prometheus"`
Prometheus Prometheus `mapstructure:"prometheus"`
TokenPolicy struct {
Expire int `mapstructure:"expire"`
Expire int64 `mapstructure:"expire"`
} `mapstructure:"tokenPolicy"`
Secret string `mapstructure:"secret"`
}
@ -217,10 +210,7 @@ type Conversation struct {
ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus struct {
Enable bool `mapstructure:"enable"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"prometheus"`
Prometheus Prometheus `mapstructure:"prometheus"`
}
type Friend struct {
@ -229,10 +219,7 @@ type Friend struct {
ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus struct {
Enable bool `mapstructure:"enable"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"prometheus"`
Prometheus Prometheus `mapstructure:"prometheus"`
}
type Group struct {
@ -241,10 +228,7 @@ type Group struct {
ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus struct {
Enable bool `mapstructure:"enable"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"prometheus"`
Prometheus Prometheus `mapstructure:"prometheus"`
}
type Msg struct {
@ -253,13 +237,10 @@ type Msg struct {
ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus struct {
Enable bool `mapstructure:"enable"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"prometheus"`
FriendVerify bool `mapstructure:"friendVerify"`
GroupMessageHasReadReceiptEnable bool `mapstructure:"groupMessageHasReadReceiptEnable"`
SingleMessageHasReadReceiptEnable bool `mapstructure:"singleMessageHasReadReceiptEnable"`
Prometheus Prometheus `mapstructure:"prometheus"`
FriendVerify bool `mapstructure:"friendVerify"`
GroupMessageHasReadReceiptEnable bool `mapstructure:"groupMessageHasReadReceiptEnable"`
SingleMessageHasReadReceiptEnable bool `mapstructure:"singleMessageHasReadReceiptEnable"`
}
type Third struct {
@ -268,11 +249,8 @@ type Third struct {
ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus struct {
Enable bool `mapstructure:"enable"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"prometheus"`
Object struct {
Prometheus Prometheus `mapstructure:"prometheus"`
Object struct {
Enable string `mapstructure:"enable"`
Cos struct {
BucketURL string `mapstructure:"bucketURL"`
@ -316,10 +294,7 @@ type User struct {
ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus struct {
Enable bool `mapstructure:"enable"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"prometheus"`
Prometheus Prometheus `mapstructure:"prometheus"`
}
type Redis struct {
@ -394,6 +369,7 @@ type ZooKeeper struct {
Address []string `mapstructure:"address"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Env string `mapstructure:"env"`
RpcRegisterName struct {
User string `mapstructure:"User"`
Friend string `mapstructure:"Friend"`

@ -2,25 +2,26 @@ package config
import (
"github.com/mitchellh/mapstructure"
"github.com/openimsdk/tools/errs"
"github.com/spf13/viper"
"strings"
)
func LoadConfig(path string, prefix string, config any) error {
func LoadConfig(path string, envPrefix string, config any) error {
v := viper.New()
v.SetConfigFile(path)
v.SetEnvPrefix(prefix)
v.SetEnvPrefix(envPrefix)
v.AutomaticEnv()
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
if err := v.ReadInConfig(); err != nil {
return err
return errs.WrapMsg(err, "failed to read config file", "path", path, "envPrefix", envPrefix)
}
if err := v.Unmarshal(config, func(config *mapstructure.DecoderConfig) {
config.TagName = "mapstructure"
}); err != nil {
return err
return errs.WrapMsg(err, "failed to unmarshal config", "path", path, "envPrefix", envPrefix)
}
return nil

@ -21,14 +21,26 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
"time"
)
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
func NewDiscoveryRegister(config *config.GlobalConfig) (discovery.SvcDiscoveryRegistry, error) {
func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper) (discovery.SvcDiscoveryRegistry, error) {
switch config.Envs.Discovery {
switch zookeeperConfig.Env {
case "zookeeper":
return zookeeper.NewZookeeperDiscoveryRegister(&config.Zookeeper)
zk, err := zookeeper.NewZkClient(
zookeeperConfig.zkAddr,
schema,
zookeeper.WithFreq(time.Hour),
zookeeper.WithUserNameAndPassword(username, password),
zookeeper.WithRoundRobin(),
zookeeper.WithTimeout(10),
)
if err != nil {
return nil, err
}
case "k8s":
return kubernetes.NewK8sDiscoveryRegister(config.RpcRegisterName.OpenImMessageGatewayName)
case "direct":

@ -31,17 +31,17 @@ func NewGrpcPromObj(cusMetrics []prometheus.Collector) (*prometheus.Registry, *g
return reg, grpcMetrics, nil
}
func GetGrpcCusMetrics(registerName string, rpcRegisterName *config2.RpcRegisterName) []prometheus.Collector {
func GetGrpcCusMetrics(registerName string, zookeeper *config2.ZooKeeper) []prometheus.Collector {
switch registerName {
case rpcRegisterName.OpenImMessageGatewayName:
case zookeeper.RpcRegisterName.MessageGateway:
return []prometheus.Collector{OnlineUserGauge}
case rpcRegisterName.OpenImMsgName:
case zookeeper.RpcRegisterName.Msg:
return []prometheus.Collector{SingleChatMsgProcessSuccessCounter, SingleChatMsgProcessFailedCounter, GroupChatMsgProcessSuccessCounter, GroupChatMsgProcessFailedCounter}
case "Transfer":
return []prometheus.Collector{MsgInsertRedisSuccessCounter, MsgInsertRedisFailedCounter, MsgInsertMongoSuccessCounter, MsgInsertMongoFailedCounter, SeqSetFailedCounter}
case rpcRegisterName.OpenImPushName:
case zookeeper.RpcRegisterName.Push:
return []prometheus.Collector{MsgOfflinePushFailedCounter}
case rpcRegisterName.OpenImAuthName:
case zookeeper.RpcRegisterName.Auth:
return []prometheus.Collector{UserLoginCounter}
default:
return nil

@ -17,6 +17,8 @@ package startrpc
import (
"context"
"fmt"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/prometheus/client_golang/prometheus"
"net"
"net/http"
"os"
@ -27,8 +29,6 @@ import (
"time"
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/tools/discovery"
@ -37,17 +37,27 @@ import (
"github.com/openimsdk/tools/mw"
"github.com/openimsdk/tools/system/program"
"github.com/openimsdk/tools/utils/network"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// Start rpc server.
func Start(ctx context.Context, rpcPort int, rpcRegisterName string, prometheusPort int, config *config2.GlobalConfig, rpcFn func(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error {
func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prometheusConfig *config2.Prometheus, listenIP,
registerIP string, rpcPorts []int, index int, rpcRegisterName string, config T, rpcFn func(ctx context.Context,
config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error {
rpcPort, err := getElemByIndex(rpcPorts, index)
if err != nil {
return err
}
prometheusPort, err := getElemByIndex(prometheusConfig.Ports, index)
if err != nil {
return err
}
log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", rpcPort,
"prometheusPort", prometheusPort)
rpcTcpAddr := net.JoinHostPort(network.GetListenIP(config.Rpc.ListenIP), strconv.Itoa(rpcPort))
rpcTcpAddr := net.JoinHostPort(network.GetListenIP(listenIP), strconv.Itoa(rpcPort))
listener, err := net.Listen(
"tcp",
rpcTcpAddr,
@ -57,22 +67,22 @@ func Start(ctx context.Context, rpcPort int, rpcRegisterName string, prometheusP
}
defer listener.Close()
client, err := kdisc.NewDiscoveryRegister(config)
client, err := kdisc.NewDiscoveryRegister(zookeeperConfig)
if err != nil {
return err
}
defer client.Close()
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
registerIP, err := network.GetRpcRegisterIP(config.Rpc.RegisterIP)
registerIP, err = network.GetRpcRegisterIP(registerIP)
if err != nil {
return err
}
var reg *prometheus.Registry
var metric *grpcprometheus.ServerMetrics
if config.Prometheus.Enable {
cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, &config.RpcRegisterName)
if prometheusConfig.Enable {
cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, zookeeperConfig)
reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics)
options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()),
grpc.UnaryInterceptor(metric.UnaryServerInterceptor()))
@ -108,7 +118,7 @@ func Start(ctx context.Context, rpcPort int, rpcRegisterName string, prometheusP
)
go func() {
if config.Prometheus.Enable && prometheusPort != 0 {
if prometheusConfig.Enable && prometheusPort != 0 {
metric.InitializeMetrics(srv)
// Create a HTTP server for prometheus.
httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)}
@ -163,3 +173,11 @@ func gracefulStopWithCtx(ctx context.Context, f func()) error {
return nil
}
}
func getElemByIndex(array []int, index int) (int, error) {
if index < 0 || index >= len(array) {
return 0, errs.New("index out of range", "index", index, "array", array).Wrap()
}
return array[index], nil
}

Loading…
Cancel
Save