feat: config

pull/2997/head
icey-yu 9 months ago
parent 42588eedfd
commit 66f8df4abb

@ -51,7 +51,7 @@ func (cm *ConfigManager) GetConfig(c *gin.Context) {
apiresp.GinError(c, errs.ErrArgs.WithDetail("config name not found").Wrap()) apiresp.GinError(c, errs.ErrArgs.WithDetail("config name not found").Wrap())
return return
} }
b, err := json.Marshal(cm.config) b, err := json.Marshal(conf)
if err != nil { if err != nil {
apiresp.GinError(c, err) apiresp.GinError(c, err)
return return

@ -28,6 +28,7 @@ import (
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery"
disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/discovery/etcd"
@ -143,6 +144,11 @@ func Start(ctx context.Context, index int, config *Config) error {
} }
}() }()
if config.Discovery.Enable == conf.ETCD {
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames())
cm.Watch(ctx)
}
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM) signal.Notify(sigs, syscall.SIGTERM)

@ -49,6 +49,13 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error {
conf.Discovery.RpcService.MessageGateway, conf.Discovery.RpcService.MessageGateway,
nil, nil,
conf, conf,
[]string{
conf.Share.GetConfigFileName(),
conf.Discovery.GetConfigFileName(),
conf.MsgGateway.GetConfigFileName(),
conf.WebhooksConfig.GetConfigFileName(),
conf.RedisConfig.GetConfigFileName(),
},
s.InitServer, s.InitServer,
) )
} }

@ -25,6 +25,7 @@ import (
"strconv" "strconv"
"syscall" "syscall"
disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/discovery/etcd"
@ -97,6 +98,19 @@ func Start(ctx context.Context, index int, config *Config) error {
return err return err
} }
if config.Discovery.Enable == conf.ETCD {
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{
config.MsgTransfer.GetConfigFileName(),
config.RedisConfig.GetConfigFileName(),
config.MongodbConfig.GetConfigFileName(),
config.KafkaConfig.GetConfigFileName(),
config.Share.GetConfigFileName(),
config.WebhooksConfig.GetConfigFileName(),
config.Discovery.GetConfigFileName(),
})
cm.Watch(ctx)
}
msgModel := redis.NewMsgCache(rdb) msgModel := redis.NewMsgCache(rdb)
msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
if err != nil { if err != nil {
@ -130,6 +144,7 @@ func Start(ctx context.Context, index int, config *Config) error {
historyMongoCH: historyMongoCH, historyMongoCH: historyMongoCH,
runTimeEnv: runTimeEnv, runTimeEnv: runTimeEnv,
} }
return msgTransfer.Start(index, config, client) return msgTransfer.Start(index, config, client)
} }

@ -1,16 +1,16 @@
package apistruct package apistruct
type GetConfigReq struct { type GetConfigReq struct {
ConfigName string `json:"config_name"` ConfigName string `json:"configName"`
} }
type GetConfigListResp struct { type GetConfigListResp struct {
Environment string `json:"environment"` Environment string `json:"environment"`
Version string `json:"version"` Version string `json:"version"`
ConfigNames []string `json:"config_names"` ConfigNames []string `json:"configNames"`
} }
type SetConfigReq struct { type SetConfigReq struct {
ConfigName string `json:"config_name"` ConfigName string `json:"configName"`
Data []byte `json:"data"` Data []byte `json:"data"`
} }

@ -57,5 +57,12 @@ func (a *AuthRpcCmd) Exec() error {
func (a *AuthRpcCmd) runE() error { func (a *AuthRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP,
a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports, a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports,
a.Index(), a.authConfig.Discovery.RpcService.Auth, nil, a.authConfig, auth.Start) a.Index(), a.authConfig.Discovery.RpcService.Auth, nil, a.authConfig,
[]string{
a.authConfig.RpcConfig.GetConfigFileName(),
a.authConfig.Share.GetConfigFileName(),
a.authConfig.RedisConfig.GetConfigFileName(),
a.authConfig.Discovery.GetConfigFileName(),
},
auth.Start)
} }

@ -59,5 +59,15 @@ func (a *ConversationRpcCmd) Exec() error {
func (a *ConversationRpcCmd) runE() error { func (a *ConversationRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP,
a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports, a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports,
a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.NotificationConfig, a.conversationConfig, conversation.Start) a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.NotificationConfig, a.conversationConfig,
[]string{
a.conversationConfig.RpcConfig.GetConfigFileName(),
a.conversationConfig.RedisConfig.GetConfigFileName(),
a.conversationConfig.MongodbConfig.GetConfigFileName(),
a.conversationConfig.NotificationConfig.GetConfigFileName(),
a.conversationConfig.Share.GetConfigFileName(),
a.conversationConfig.LocalCacheConfig.GetConfigFileName(),
a.conversationConfig.Discovery.GetConfigFileName(),
},
conversation.Start)
} }

@ -60,5 +60,16 @@ func (a *FriendRpcCmd) Exec() error {
func (a *FriendRpcCmd) runE() error { func (a *FriendRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP,
a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports, a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports,
a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.NotificationConfig, a.relationConfig, relation.Start) a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.NotificationConfig, a.relationConfig,
[]string{
a.relationConfig.RpcConfig.GetConfigFileName(),
a.relationConfig.RedisConfig.GetConfigFileName(),
a.relationConfig.MongodbConfig.GetConfigFileName(),
a.relationConfig.NotificationConfig.GetConfigFileName(),
a.relationConfig.Share.GetConfigFileName(),
a.relationConfig.WebhooksConfig.GetConfigFileName(),
a.relationConfig.LocalCacheConfig.GetConfigFileName(),
a.relationConfig.Discovery.GetConfigFileName(),
},
relation.Start)
} }

@ -61,5 +61,16 @@ func (a *GroupRpcCmd) Exec() error {
func (a *GroupRpcCmd) runE() error { func (a *GroupRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP,
a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports, a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports,
a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.NotificationConfig, a.groupConfig, group.Start, versionctx.EnableVersionCtx()) a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.NotificationConfig, a.groupConfig,
[]string{
a.groupConfig.RpcConfig.GetConfigFileName(),
a.groupConfig.RedisConfig.GetConfigFileName(),
a.groupConfig.MongodbConfig.GetConfigFileName(),
a.groupConfig.NotificationConfig.GetConfigFileName(),
a.groupConfig.Share.GetConfigFileName(),
a.groupConfig.WebhooksConfig.GetConfigFileName(),
a.groupConfig.LocalCacheConfig.GetConfigFileName(),
a.groupConfig.Discovery.GetConfigFileName(),
},
group.Start, versionctx.EnableVersionCtx())
} }

@ -61,5 +61,17 @@ func (a *MsgRpcCmd) Exec() error {
func (a *MsgRpcCmd) runE() error { func (a *MsgRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP,
a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports, a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports,
a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.NotificationConfig, a.msgConfig, msg.Start) a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.NotificationConfig, a.msgConfig,
[]string{
a.msgConfig.RpcConfig.GetConfigFileName(),
a.msgConfig.RedisConfig.GetConfigFileName(),
a.msgConfig.MongodbConfig.GetConfigFileName(),
a.msgConfig.KafkaConfig.GetConfigFileName(),
a.msgConfig.NotificationConfig.GetConfigFileName(),
a.msgConfig.Share.GetConfigFileName(),
a.msgConfig.WebhooksConfig.GetConfigFileName(),
a.msgConfig.LocalCacheConfig.GetConfigFileName(),
a.msgConfig.Discovery.GetConfigFileName(),
},
msg.Start)
} }

@ -61,5 +61,16 @@ func (a *PushRpcCmd) Exec() error {
func (a *PushRpcCmd) runE() error { func (a *PushRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP,
a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports, a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports,
a.Index(), a.pushConfig.Discovery.RpcService.Push, &a.pushConfig.NotificationConfig, a.pushConfig, push.Start) a.Index(), a.pushConfig.Discovery.RpcService.Push, &a.pushConfig.NotificationConfig, a.pushConfig,
[]string{
a.pushConfig.RpcConfig.GetConfigFileName(),
a.pushConfig.RedisConfig.GetConfigFileName(),
a.pushConfig.KafkaConfig.GetConfigFileName(),
a.pushConfig.NotificationConfig.GetConfigFileName(),
a.pushConfig.Share.GetConfigFileName(),
a.pushConfig.WebhooksConfig.GetConfigFileName(),
a.pushConfig.LocalCacheConfig.GetConfigFileName(),
a.pushConfig.Discovery.GetConfigFileName(),
},
push.Start)
} }

@ -60,5 +60,16 @@ func (a *ThirdRpcCmd) Exec() error {
func (a *ThirdRpcCmd) runE() error { func (a *ThirdRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP,
a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports, a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports,
a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.NotificationConfig, a.thirdConfig, third.Start) a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.NotificationConfig, a.thirdConfig,
[]string{
a.thirdConfig.RpcConfig.GetConfigFileName(),
a.thirdConfig.RedisConfig.GetConfigFileName(),
a.thirdConfig.MongodbConfig.GetConfigFileName(),
a.thirdConfig.NotificationConfig.GetConfigFileName(),
a.thirdConfig.Share.GetConfigFileName(),
a.thirdConfig.MinioConfig.GetConfigFileName(),
a.thirdConfig.LocalCacheConfig.GetConfigFileName(),
a.thirdConfig.Discovery.GetConfigFileName(),
},
third.Start)
} }

@ -61,5 +61,17 @@ func (a *UserRpcCmd) Exec() error {
func (a *UserRpcCmd) runE() error { func (a *UserRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP,
a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports, a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports,
a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.NotificationConfig, a.userConfig, user.Start) a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.NotificationConfig, a.userConfig,
[]string{
a.userConfig.RpcConfig.GetConfigFileName(),
a.userConfig.RedisConfig.GetConfigFileName(),
a.userConfig.MongodbConfig.GetConfigFileName(),
a.userConfig.KafkaConfig.GetConfigFileName(),
a.userConfig.NotificationConfig.GetConfigFileName(),
a.userConfig.Share.GetConfigFileName(),
a.userConfig.WebhooksConfig.GetConfigFileName(),
a.userConfig.LocalCacheConfig.GetConfigFileName(),
a.userConfig.Discovery.GetConfigFileName(),
},
user.Start)
} }

@ -5,6 +5,7 @@ import (
"os" "os"
"os/exec" "os/exec"
"runtime" "runtime"
"sync"
"syscall" "syscall"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
@ -20,6 +21,7 @@ const (
type ConfigManager struct { type ConfigManager struct {
client *clientv3.Client client *clientv3.Client
watchConfigNames []string watchConfigNames []string
lock sync.RWMutex
} }
func BuildKey(s string) string { func BuildKey(s string) string {
@ -47,10 +49,12 @@ func (c *ConfigManager) Watch(ctx context.Context) {
for _, event := range watchResp.Events { for _, event := range watchResp.Events {
if event.IsModify() { if event.IsModify() {
if datautil.Contain(string(event.Kv.Key), c.watchConfigNames...) { if datautil.Contain(string(event.Kv.Key), c.watchConfigNames...) {
c.lock.Lock()
err := restartServer(ctx) err := restartServer(ctx)
if err != nil { if err != nil {
log.ZError(ctx, "restart server err", err) log.ZError(ctx, "restart server err", err)
} }
c.lock.Unlock()
} }
} }
} }

@ -27,6 +27,7 @@ import (
"time" "time"
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
@ -49,6 +50,7 @@ import (
// Start rpc server. // Start rpc server.
func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP,
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T,
watchConfigNames []string,
rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error,
options ...grpc.ServerOption) error { options ...grpc.ServerOption) error {
@ -196,6 +198,11 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
} }
}() }()
if discovery.Enable == conf.ETCD {
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), watchConfigNames)
cm.Watch(ctx)
}
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM) signal.Notify(sigs, syscall.SIGTERM)
select { select {

Loading…
Cancel
Save