diff --git a/internal/api/config_manager.go b/internal/api/config_manager.go index 72f33472e..2964f21b3 100644 --- a/internal/api/config_manager.go +++ b/internal/api/config_manager.go @@ -51,7 +51,7 @@ func (cm *ConfigManager) GetConfig(c *gin.Context) { apiresp.GinError(c, errs.ErrArgs.WithDetail("config name not found").Wrap()) return } - b, err := json.Marshal(cm.config) + b, err := json.Marshal(conf) if err != nil { apiresp.GinError(c, err) return diff --git a/internal/api/init.go b/internal/api/init.go index 0686b6628..6414513de 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -28,6 +28,7 @@ import ( conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery" + disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/tools/discovery/etcd" @@ -143,6 +144,11 @@ func Start(ctx context.Context, index int, config *Config) error { } }() + if config.Discovery.Enable == conf.ETCD { + cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames()) + cm.Watch(ctx) + } + sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGTERM) diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index f6c12350c..43386ed9a 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -49,6 +49,13 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error { conf.Discovery.RpcService.MessageGateway, nil, conf, + []string{ + conf.Share.GetConfigFileName(), + conf.Discovery.GetConfigFileName(), + conf.MsgGateway.GetConfigFileName(), + conf.WebhooksConfig.GetConfigFileName(), + conf.RedisConfig.GetConfigFileName(), + }, s.InitServer, ) } diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 506984ffc..bf02f6a7b 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -25,6 +25,7 @@ import ( "strconv" "syscall" + disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/etcd" @@ -97,6 +98,19 @@ func Start(ctx context.Context, index int, config *Config) error { return err } + if config.Discovery.Enable == conf.ETCD { + cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{ + config.MsgTransfer.GetConfigFileName(), + config.RedisConfig.GetConfigFileName(), + config.MongodbConfig.GetConfigFileName(), + config.KafkaConfig.GetConfigFileName(), + config.Share.GetConfigFileName(), + config.WebhooksConfig.GetConfigFileName(), + config.Discovery.GetConfigFileName(), + }) + cm.Watch(ctx) + } + msgModel := redis.NewMsgCache(rdb) msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) if err != nil { @@ -130,6 +144,7 @@ func Start(ctx context.Context, index int, config *Config) error { historyMongoCH: historyMongoCH, runTimeEnv: runTimeEnv, } + return msgTransfer.Start(index, config, client) } diff --git a/pkg/apistruct/config_manager.go b/pkg/apistruct/config_manager.go index a538cc52c..9b734fbe0 100644 --- a/pkg/apistruct/config_manager.go +++ b/pkg/apistruct/config_manager.go @@ -1,16 +1,16 @@ package apistruct type GetConfigReq struct { - ConfigName string `json:"config_name"` + ConfigName string `json:"configName"` } type GetConfigListResp struct { Environment string `json:"environment"` Version string `json:"version"` - ConfigNames []string `json:"config_names"` + ConfigNames []string `json:"configNames"` } type SetConfigReq struct { - ConfigName string `json:"config_name"` + ConfigName string `json:"configName"` Data []byte `json:"data"` } diff --git a/pkg/common/cmd/auth.go b/pkg/common/cmd/auth.go index 2215dce89..a5ab3fea7 100644 --- a/pkg/common/cmd/auth.go +++ b/pkg/common/cmd/auth.go @@ -57,5 +57,12 @@ func (a *AuthRpcCmd) Exec() error { func (a *AuthRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports, - a.Index(), a.authConfig.Discovery.RpcService.Auth, nil, a.authConfig, auth.Start) + a.Index(), a.authConfig.Discovery.RpcService.Auth, nil, a.authConfig, + []string{ + a.authConfig.RpcConfig.GetConfigFileName(), + a.authConfig.Share.GetConfigFileName(), + a.authConfig.RedisConfig.GetConfigFileName(), + a.authConfig.Discovery.GetConfigFileName(), + }, + auth.Start) } diff --git a/pkg/common/cmd/conversation.go b/pkg/common/cmd/conversation.go index 05b5cadcd..12c29a873 100644 --- a/pkg/common/cmd/conversation.go +++ b/pkg/common/cmd/conversation.go @@ -59,5 +59,15 @@ func (a *ConversationRpcCmd) Exec() error { func (a *ConversationRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports, - a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.NotificationConfig, a.conversationConfig, conversation.Start) + a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.NotificationConfig, a.conversationConfig, + []string{ + a.conversationConfig.RpcConfig.GetConfigFileName(), + a.conversationConfig.RedisConfig.GetConfigFileName(), + a.conversationConfig.MongodbConfig.GetConfigFileName(), + a.conversationConfig.NotificationConfig.GetConfigFileName(), + a.conversationConfig.Share.GetConfigFileName(), + a.conversationConfig.LocalCacheConfig.GetConfigFileName(), + a.conversationConfig.Discovery.GetConfigFileName(), + }, + conversation.Start) } diff --git a/pkg/common/cmd/friend.go b/pkg/common/cmd/friend.go index bb377b4af..209d481bb 100644 --- a/pkg/common/cmd/friend.go +++ b/pkg/common/cmd/friend.go @@ -60,5 +60,16 @@ func (a *FriendRpcCmd) Exec() error { func (a *FriendRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports, - a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.NotificationConfig, a.relationConfig, relation.Start) + a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.NotificationConfig, a.relationConfig, + []string{ + a.relationConfig.RpcConfig.GetConfigFileName(), + a.relationConfig.RedisConfig.GetConfigFileName(), + a.relationConfig.MongodbConfig.GetConfigFileName(), + a.relationConfig.NotificationConfig.GetConfigFileName(), + a.relationConfig.Share.GetConfigFileName(), + a.relationConfig.WebhooksConfig.GetConfigFileName(), + a.relationConfig.LocalCacheConfig.GetConfigFileName(), + a.relationConfig.Discovery.GetConfigFileName(), + }, + relation.Start) } diff --git a/pkg/common/cmd/group.go b/pkg/common/cmd/group.go index de7c7102c..23fd460f7 100644 --- a/pkg/common/cmd/group.go +++ b/pkg/common/cmd/group.go @@ -61,5 +61,16 @@ func (a *GroupRpcCmd) Exec() error { func (a *GroupRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports, - a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.NotificationConfig, a.groupConfig, group.Start, versionctx.EnableVersionCtx()) + a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.NotificationConfig, a.groupConfig, + []string{ + a.groupConfig.RpcConfig.GetConfigFileName(), + a.groupConfig.RedisConfig.GetConfigFileName(), + a.groupConfig.MongodbConfig.GetConfigFileName(), + a.groupConfig.NotificationConfig.GetConfigFileName(), + a.groupConfig.Share.GetConfigFileName(), + a.groupConfig.WebhooksConfig.GetConfigFileName(), + a.groupConfig.LocalCacheConfig.GetConfigFileName(), + a.groupConfig.Discovery.GetConfigFileName(), + }, + group.Start, versionctx.EnableVersionCtx()) } diff --git a/pkg/common/cmd/msg.go b/pkg/common/cmd/msg.go index eb5938f7c..b6f0b6131 100644 --- a/pkg/common/cmd/msg.go +++ b/pkg/common/cmd/msg.go @@ -61,5 +61,17 @@ func (a *MsgRpcCmd) Exec() error { func (a *MsgRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports, - a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.NotificationConfig, a.msgConfig, msg.Start) + a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.NotificationConfig, a.msgConfig, + []string{ + a.msgConfig.RpcConfig.GetConfigFileName(), + a.msgConfig.RedisConfig.GetConfigFileName(), + a.msgConfig.MongodbConfig.GetConfigFileName(), + a.msgConfig.KafkaConfig.GetConfigFileName(), + a.msgConfig.NotificationConfig.GetConfigFileName(), + a.msgConfig.Share.GetConfigFileName(), + a.msgConfig.WebhooksConfig.GetConfigFileName(), + a.msgConfig.LocalCacheConfig.GetConfigFileName(), + a.msgConfig.Discovery.GetConfigFileName(), + }, + msg.Start) } diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index 90bcfefb3..41b9d56e6 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -61,5 +61,16 @@ func (a *PushRpcCmd) Exec() error { func (a *PushRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports, - a.Index(), a.pushConfig.Discovery.RpcService.Push, &a.pushConfig.NotificationConfig, a.pushConfig, push.Start) + a.Index(), a.pushConfig.Discovery.RpcService.Push, &a.pushConfig.NotificationConfig, a.pushConfig, + []string{ + a.pushConfig.RpcConfig.GetConfigFileName(), + a.pushConfig.RedisConfig.GetConfigFileName(), + a.pushConfig.KafkaConfig.GetConfigFileName(), + a.pushConfig.NotificationConfig.GetConfigFileName(), + a.pushConfig.Share.GetConfigFileName(), + a.pushConfig.WebhooksConfig.GetConfigFileName(), + a.pushConfig.LocalCacheConfig.GetConfigFileName(), + a.pushConfig.Discovery.GetConfigFileName(), + }, + push.Start) } diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go index 3efebb92f..5086116b5 100644 --- a/pkg/common/cmd/third.go +++ b/pkg/common/cmd/third.go @@ -60,5 +60,16 @@ func (a *ThirdRpcCmd) Exec() error { func (a *ThirdRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports, - a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.NotificationConfig, a.thirdConfig, third.Start) + a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.NotificationConfig, a.thirdConfig, + []string{ + a.thirdConfig.RpcConfig.GetConfigFileName(), + a.thirdConfig.RedisConfig.GetConfigFileName(), + a.thirdConfig.MongodbConfig.GetConfigFileName(), + a.thirdConfig.NotificationConfig.GetConfigFileName(), + a.thirdConfig.Share.GetConfigFileName(), + a.thirdConfig.MinioConfig.GetConfigFileName(), + a.thirdConfig.LocalCacheConfig.GetConfigFileName(), + a.thirdConfig.Discovery.GetConfigFileName(), + }, + third.Start) } diff --git a/pkg/common/cmd/user.go b/pkg/common/cmd/user.go index c74013020..61125e0c3 100644 --- a/pkg/common/cmd/user.go +++ b/pkg/common/cmd/user.go @@ -61,5 +61,17 @@ func (a *UserRpcCmd) Exec() error { func (a *UserRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports, - a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.NotificationConfig, a.userConfig, user.Start) + a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.NotificationConfig, a.userConfig, + []string{ + a.userConfig.RpcConfig.GetConfigFileName(), + a.userConfig.RedisConfig.GetConfigFileName(), + a.userConfig.MongodbConfig.GetConfigFileName(), + a.userConfig.KafkaConfig.GetConfigFileName(), + a.userConfig.NotificationConfig.GetConfigFileName(), + a.userConfig.Share.GetConfigFileName(), + a.userConfig.WebhooksConfig.GetConfigFileName(), + a.userConfig.LocalCacheConfig.GetConfigFileName(), + a.userConfig.Discovery.GetConfigFileName(), + }, + user.Start) } diff --git a/pkg/common/discovery/etcd/config_manager.go b/pkg/common/discovery/etcd/config_manager.go index 7d62d1919..d7d6153ea 100644 --- a/pkg/common/discovery/etcd/config_manager.go +++ b/pkg/common/discovery/etcd/config_manager.go @@ -5,6 +5,7 @@ import ( "os" "os/exec" "runtime" + "sync" "syscall" "github.com/openimsdk/tools/errs" @@ -20,6 +21,7 @@ const ( type ConfigManager struct { client *clientv3.Client watchConfigNames []string + lock sync.RWMutex } func BuildKey(s string) string { @@ -47,10 +49,12 @@ func (c *ConfigManager) Watch(ctx context.Context) { for _, event := range watchResp.Events { if event.IsModify() { if datautil.Contain(string(event.Kv.Key), c.watchConfigNames...) { + c.lock.Lock() err := restartServer(ctx) if err != nil { log.ZError(ctx, "restart server err", err) } + c.lock.Unlock() } } } diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index c9be1d8a6..e6637365b 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -27,6 +27,7 @@ import ( "time" conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" + disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/utils/datautil" @@ -49,6 +50,7 @@ import ( // Start rpc server. func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, + watchConfigNames []string, rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { @@ -196,6 +198,11 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf } }() + if discovery.Enable == conf.ETCD { + cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), watchConfigNames) + cm.Watch(ctx) + } + sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGTERM) select {