diff --git a/config/openim-push.yml b/config/openim-push.yml index e98324620..5db5b541a 100644 --- a/config/openim-push.yml +++ b/config/openim-push.yml @@ -21,7 +21,7 @@ prometheus: maxConcurrentWorkers: 3 #Use geTui for offline push notifications, or choose fcm or jpns; corresponding configuration settings must be specified. enable: -geTui: +getui: pushUrl: https://restapi.getui.com/v2/$appId masterSecret: appKey: diff --git a/internal/api/config_manager.go b/internal/api/config_manager.go index c498b11b4..1ea312aac 100644 --- a/internal/api/config_manager.go +++ b/internal/api/config_manager.go @@ -3,6 +3,8 @@ package api import ( "encoding/json" "reflect" + "strconv" + "time" "github.com/gin-gonic/gin" "github.com/openimsdk/open-im-server/v3/pkg/apistruct" @@ -159,12 +161,12 @@ func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, } func (cm *ConfigManager) ResetConfig(c *gin.Context) { - go cm.restart(c) + go cm.resetConfig(c) apiresp.GinSuccess(c, nil) } -func (cm *ConfigManager) restart(c *gin.Context) { - <-etcd.CanRestart +func (cm *ConfigManager) resetConfig(c *gin.Context) { + txn := cm.client.Txn(c) type initConf struct { old any new any @@ -220,11 +222,25 @@ func (cm *ConfigManager) restart(c *gin.Context) { log.ZError(c, "marshal config failed", err) continue } - _, err = cm.client.Put(c, etcd.BuildKey(k), string(data)) - if err != nil { - log.ZError(c, "save to etcd failed", err) - continue - } + txn = txn.Then(clientv3.OpPut(etcd.BuildKey(k), string(data))) + } + _, err := txn.Commit() + if err != nil { + log.ZError(c, "commit etcd txn failed", err) + return + } +} + +func (cm *ConfigManager) Restart(c *gin.Context) { + go cm.restart(c) + apiresp.GinSuccess(c, nil) +} + +func (cm *ConfigManager) restart(c *gin.Context) { + time.Sleep(time.Millisecond * 200) // wait for Restart http call return + t := time.Now().Unix() + _, err := cm.client.Put(c, etcd.BuildKey(etcd.RestartKey), strconv.Itoa(int(t))) + if err != nil { + log.ZError(c, "restart etcd put key failed", err) } - etcd.CanRestart <- struct{}{} } diff --git a/internal/api/router.go b/internal/api/router.go index fa3ac2c09..789b8205d 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -300,18 +300,22 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf proDiscoveryGroup.GET("/msg_transfer", pd.MessageTransfer) } + var etcdClient *clientv3.Client + if cfg.Discovery.Enable == config.ETCD { + etcdClient = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + } + cm := NewConfigManager(cfg.Share.IMAdminUserID, cfg.AllConfig, etcdClient, cfg.ConfigPath, cfg.RuntimeEnv) { - var etcdClient *clientv3.Client - if cfg.Discovery.Enable == config.ETCD { - etcdClient = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() - } - cm := NewConfigManager(cfg.Share.IMAdminUserID, cfg.AllConfig, etcdClient, cfg.ConfigPath, cfg.RuntimeEnv) + configGroup := r.Group("/config", cm.CheckAdmin) configGroup.POST("/get_config_list", cm.GetConfigList) configGroup.POST("/get_config", cm.GetConfig) configGroup.POST("/set_config", cm.SetConfig) configGroup.POST("/reset_config", cm.ResetConfig) } + { + r.POST("/restart", cm.Restart) + } return r, nil } diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index c519a7e2a..b32732b60 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -103,6 +103,7 @@ func Start(ctx context.Context, index int, config *Config) error { config.Share.GetConfigFileName(), config.WebhooksConfig.GetConfigFileName(), config.Discovery.GetConfigFileName(), + conf.LogConfigFileName, }) cm.Watch(ctx) } diff --git a/pkg/common/discovery/etcd/config_manager.go b/pkg/common/discovery/etcd/config_manager.go index 37fd3b031..b2e47a54e 100644 --- a/pkg/common/discovery/etcd/config_manager.go +++ b/pkg/common/discovery/etcd/config_manager.go @@ -5,6 +5,7 @@ import ( "os" "os/exec" "runtime" + "sync" "syscall" "github.com/openimsdk/tools/errs" @@ -15,18 +16,13 @@ import ( const ( ConfigKeyPrefix = "/open-im/config/" + RestartKey = "restart" ) var ( - ShutDowns []func() error - CanRestart chan struct{} + ShutDowns []func() error ) -func init() { - CanRestart = make(chan struct{}, 1) - CanRestart <- struct{}{} -} - func RegisterShutDown(shutDown ...func() error) { ShutDowns = append(ShutDowns, shutDown...) } @@ -34,6 +30,7 @@ func RegisterShutDown(shutDown ...func() error) { type ConfigManager struct { client *clientv3.Client watchConfigNames []string + lock sync.Mutex } func BuildKey(s string) string { @@ -43,7 +40,7 @@ func BuildKey(s string) string { func NewConfigManager(client *clientv3.Client, configNames []string) *ConfigManager { return &ConfigManager{ client: client, - watchConfigNames: datautil.Batch(func(s string) string { return BuildKey(s) }, configNames)} + watchConfigNames: datautil.Batch(func(s string) string { return BuildKey(s) }, append(configNames, RestartKey))} } func (c *ConfigManager) Watch(ctx context.Context) { @@ -61,12 +58,12 @@ func (c *ConfigManager) Watch(ctx context.Context) { for _, event := range watchResp.Events { if event.IsModify() { if datautil.Contain(string(event.Kv.Key), c.watchConfigNames...) { - <-CanRestart + c.lock.Lock() err := restartServer(ctx) if err != nil { log.ZError(ctx, "restart server err", err) - CanRestart <- struct{}{} } + c.lock.Unlock() } } } diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index d610e8e5a..4cc414603 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -53,6 +53,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { + watchConfigNames = append(watchConfigNames, conf.LogConfigFileName) var ( rpcTcpAddr string netDone = make(chan struct{}, 2)