From d498fd90ca1d6d33869efa76ada136cb68c35299 Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Sat, 4 Jan 2025 15:53:51 +0800 Subject: [PATCH] fix: config --- go.mod | 2 +- go.sum | 4 +-- internal/api/init.go | 4 ++- internal/msggateway/hub_server.go | 6 +++- internal/msgtransfer/init.go | 2 +- internal/tools/cron_task.go | 34 +++++++++++++++-------- pkg/common/cmd/auth.go | 3 ++ pkg/common/cmd/conversation.go | 2 +- pkg/common/cmd/friend.go | 2 +- pkg/common/cmd/group.go | 2 +- pkg/common/cmd/msg.go | 2 +- pkg/common/cmd/push.go | 3 ++ pkg/common/cmd/root.go | 2 +- pkg/common/cmd/third.go | 2 +- pkg/common/cmd/user.go | 2 +- pkg/common/discovery/discoveryregister.go | 3 +- pkg/common/startrpc/start.go | 4 +-- 17 files changed, 52 insertions(+), 27 deletions(-) diff --git a/go.mod b/go.mod index 387cb7304..146645451 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.72-alpha.69 - github.com/openimsdk/tools v0.0.50-alpha.62 + github.com/openimsdk/tools v0.0.50-alpha.64 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index d207325e9..cb594472b 100644 --- a/go.sum +++ b/go.sum @@ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrk github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.72-alpha.69 h1:b22oY2XTdBR/BePqA73KsrM3GDF3Vk8YcBEXZU4ArJc= github.com/openimsdk/protocol v0.0.72-alpha.69/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= -github.com/openimsdk/tools v0.0.50-alpha.62 h1:e/m1XL7+EXbkOoxr/En/612WcOPKOUHPBj0++gG6MuQ= -github.com/openimsdk/tools v0.0.50-alpha.62/go.mod h1:JowL2jYr8tu4vcQe+5hJh4v3BtSx1T0CIS3pgU/Mw+U= +github.com/openimsdk/tools v0.0.50-alpha.64 h1:KmtE8V2K8atQJJg1xq2ySSrPQyf8ldwk8fw6jRnsxCw= +github.com/openimsdk/tools v0.0.50-alpha.64/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/init.go b/internal/api/init.go index 780ecb913..20237ebc2 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -58,7 +58,9 @@ func Start(ctx context.Context, index int, config *Config) error { config.RuntimeEnv = runtimeenv.PrintRuntimeEnvironment() - client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.RuntimeEnv) + client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.RuntimeEnv, []string{ + config.Discovery.RpcService.MessageGateway, + }) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 52afe495b..dda90f1c8 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -16,9 +16,10 @@ package msggateway import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "sync/atomic" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" @@ -64,6 +65,9 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error { conf.WebhooksConfig.GetConfigFileName(), conf.RedisConfig.GetConfigFileName(), }, + []string{ + conf.Discovery.RpcService.MessageGateway, + }, s.InitServer, ) } diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index fcd6152dc..96e6bbde0 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -87,7 +87,7 @@ func Start(ctx context.Context, index int, config *Config) error { if err != nil { return err } - client, err := discRegister.NewDiscoveryRegister(&config.Discovery, runTimeEnv) + client, err := discRegister.NewDiscoveryRegister(&config.Discovery, runTimeEnv, nil) if err != nil { return err } diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 71fd886f6..da1c6320e 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -2,11 +2,14 @@ package tools import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery" + disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/third" + "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mw" @@ -27,38 +30,47 @@ type CronTaskConfig struct { runTimeEnv string } -func Start(ctx context.Context, config *CronTaskConfig) error { - config.runTimeEnv = runtimeenv.PrintRuntimeEnvironment() +func Start(ctx context.Context, conf *CronTaskConfig) error { + conf.runTimeEnv = runtimeenv.PrintRuntimeEnvironment() - log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", config.runTimeEnv, "chatRecordsClearTime", config.CronTask.CronExecuteTime, "msgDestructTime", config.CronTask.RetainChatRecords) - if config.CronTask.RetainChatRecords < 1 { + log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", conf.runTimeEnv, "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords) + if conf.CronTask.RetainChatRecords < 1 { return errs.New("msg destruct time must be greater than 1").Wrap() } - client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.runTimeEnv) + client, err := kdisc.NewDiscoveryRegister(&conf.Discovery, conf.runTimeEnv, nil) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) - ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0]) + ctx = mcontext.SetOpUserID(ctx, conf.Share.IMAdminUserID[0]) - msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg) + msgConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Msg) if err != nil { return err } - thirdConn, err := client.GetConn(ctx, config.Discovery.RpcService.Third) + thirdConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Third) if err != nil { return err } - conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation) + conversationConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Conversation) if err != nil { return err } + if conf.Discovery.Enable == config.ETCD { + cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{ + conf.CronTask.GetConfigFileName(), + conf.Share.GetConfigFileName(), + conf.Discovery.GetConfigFileName(), + }) + cm.Watch(ctx) + } + srv := &cronServer{ ctx: ctx, - config: config, + config: conf, cron: cron.New(), msgClient: msg.NewMsgClient(msgConn), conversationClient: pbconversation.NewConversationClient(conversationConn), @@ -74,7 +86,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error { if err := srv.registerClearUserMsg(); err != nil { return err } - log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) + log.ZDebug(ctx, "start cron task", "CronExecuteTime", conf.CronTask.CronExecuteTime) srv.cron.Start() <-ctx.Done() return nil diff --git a/pkg/common/cmd/auth.go b/pkg/common/cmd/auth.go index a5ab3fea7..b09d4153f 100644 --- a/pkg/common/cmd/auth.go +++ b/pkg/common/cmd/auth.go @@ -64,5 +64,8 @@ func (a *AuthRpcCmd) runE() error { a.authConfig.RedisConfig.GetConfigFileName(), a.authConfig.Discovery.GetConfigFileName(), }, + []string{ + a.authConfig.Discovery.RpcService.MessageGateway, + }, auth.Start) } diff --git a/pkg/common/cmd/conversation.go b/pkg/common/cmd/conversation.go index 12c29a873..2f8769897 100644 --- a/pkg/common/cmd/conversation.go +++ b/pkg/common/cmd/conversation.go @@ -68,6 +68,6 @@ func (a *ConversationRpcCmd) runE() error { a.conversationConfig.Share.GetConfigFileName(), a.conversationConfig.LocalCacheConfig.GetConfigFileName(), a.conversationConfig.Discovery.GetConfigFileName(), - }, + }, nil, conversation.Start) } diff --git a/pkg/common/cmd/friend.go b/pkg/common/cmd/friend.go index 209d481bb..dd850cf17 100644 --- a/pkg/common/cmd/friend.go +++ b/pkg/common/cmd/friend.go @@ -70,6 +70,6 @@ func (a *FriendRpcCmd) runE() error { a.relationConfig.WebhooksConfig.GetConfigFileName(), a.relationConfig.LocalCacheConfig.GetConfigFileName(), a.relationConfig.Discovery.GetConfigFileName(), - }, + }, nil, relation.Start) } diff --git a/pkg/common/cmd/group.go b/pkg/common/cmd/group.go index 23fd460f7..7a599077f 100644 --- a/pkg/common/cmd/group.go +++ b/pkg/common/cmd/group.go @@ -71,6 +71,6 @@ func (a *GroupRpcCmd) runE() error { a.groupConfig.WebhooksConfig.GetConfigFileName(), a.groupConfig.LocalCacheConfig.GetConfigFileName(), a.groupConfig.Discovery.GetConfigFileName(), - }, + }, nil, group.Start, versionctx.EnableVersionCtx()) } diff --git a/pkg/common/cmd/msg.go b/pkg/common/cmd/msg.go index b6f0b6131..c4049be05 100644 --- a/pkg/common/cmd/msg.go +++ b/pkg/common/cmd/msg.go @@ -72,6 +72,6 @@ func (a *MsgRpcCmd) runE() error { a.msgConfig.WebhooksConfig.GetConfigFileName(), a.msgConfig.LocalCacheConfig.GetConfigFileName(), a.msgConfig.Discovery.GetConfigFileName(), - }, + }, nil, msg.Start) } diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index 41b9d56e6..c4ae84952 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -72,5 +72,8 @@ func (a *PushRpcCmd) runE() error { a.pushConfig.LocalCacheConfig.GetConfigFileName(), a.pushConfig.Discovery.GetConfigFileName(), }, + []string{ + a.pushConfig.Discovery.RpcService.MessageGateway, + }, push.Start) } diff --git a/pkg/common/cmd/root.go b/pkg/common/cmd/root.go index 2a6f370b9..0a405fb6e 100644 --- a/pkg/common/cmd/root.go +++ b/pkg/common/cmd/root.go @@ -93,7 +93,7 @@ func (r *RootCmd) initEtcd() error { return err } if disConfig.Enable == config.ETCD { - discov, _ := kdisc.NewDiscoveryRegister(&disConfig, env) + discov, _ := kdisc.NewDiscoveryRegister(&disConfig, env, nil) r.etcdClient = discov.(*etcd.SvcDiscoveryRegistryImpl).GetClient() } return nil diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go index 5086116b5..e567234e4 100644 --- a/pkg/common/cmd/third.go +++ b/pkg/common/cmd/third.go @@ -70,6 +70,6 @@ func (a *ThirdRpcCmd) runE() error { a.thirdConfig.MinioConfig.GetConfigFileName(), a.thirdConfig.LocalCacheConfig.GetConfigFileName(), a.thirdConfig.Discovery.GetConfigFileName(), - }, + }, nil, third.Start) } diff --git a/pkg/common/cmd/user.go b/pkg/common/cmd/user.go index 61125e0c3..190f6f892 100644 --- a/pkg/common/cmd/user.go +++ b/pkg/common/cmd/user.go @@ -72,6 +72,6 @@ func (a *UserRpcCmd) runE() error { a.userConfig.WebhooksConfig.GetConfigFileName(), a.userConfig.LocalCacheConfig.GetConfigFileName(), a.userConfig.Discovery.GetConfigFileName(), - }, + }, nil, user.Start) } diff --git a/pkg/common/discovery/discoveryregister.go b/pkg/common/discovery/discoveryregister.go index bc9fd0f5a..1b64c3e78 100644 --- a/pkg/common/discovery/discoveryregister.go +++ b/pkg/common/discovery/discoveryregister.go @@ -28,7 +28,7 @@ import ( ) // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. -func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string) (discovery.SvcDiscoveryRegistry, error) { +func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string, watchNames []string) (discovery.SvcDiscoveryRegistry, error) { if runtimeEnv == config.KUBERNETES { return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace, grpc.WithDefaultCallOptions( @@ -42,6 +42,7 @@ func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string) (disco return etcd.NewSvcDiscoveryRegistry( discovery.Etcd.RootDirectory, discovery.Etcd.Address, + watchNames, etcd.WithDialTimeout(10*time.Second), etcd.WithMaxCallSendMsgSize(20*1024*1024), etcd.WithUsernameAndPassword(discovery.Etcd.Username, discovery.Etcd.Password)) diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 5facc8f73..27aabca95 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -49,7 +49,7 @@ import ( // Start rpc server. func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, - watchConfigNames []string, + watchConfigNames []string, watchServiceNames []string, rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { @@ -95,7 +95,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf if autoSetPorts && discovery.Enable != conf.ETCD { return errs.New("only etcd support autoSetPorts", "rpcRegisterName", rpcRegisterName).Wrap() } - client, err := kdisc.NewDiscoveryRegister(discovery, runTimeEnv) + client, err := kdisc.NewDiscoveryRegister(discovery, runTimeEnv, watchServiceNames) if err != nil { return err }