From bbbf17cbe4330ba0fcac45d28f129f33a4fc4b7e Mon Sep 17 00:00:00 2001 From: icey-yu <119291641+icey-yu@users.noreply.github.com> Date: Sat, 7 Dec 2024 12:11:12 +0800 Subject: [PATCH] feat: config (#2934) --- config/discovery.yml | 11 ++++++++++ internal/api/init.go | 2 +- internal/api/router.go | 14 ++++++------ internal/msggateway/hub_server.go | 4 ++-- internal/msggateway/message_handler.go | 2 +- internal/msggateway/ws_server.go | 10 ++++----- internal/msgtransfer/init.go | 6 ++--- internal/push/onlinepusher.go | 2 +- internal/push/push_handler.go | 10 ++++----- internal/rpc/auth/auth.go | 4 ++-- internal/rpc/conversation/conversation.go | 6 ++--- internal/rpc/group/group.go | 6 ++--- internal/rpc/msg/server.go | 8 +++---- internal/rpc/relation/friend.go | 6 ++--- internal/rpc/third/third.go | 2 +- internal/rpc/user/user.go | 6 ++--- internal/tools/cron_task.go | 22 ++++--------------- pkg/common/cmd/auth.go | 2 +- pkg/common/cmd/conversation.go | 2 +- pkg/common/cmd/friend.go | 2 +- pkg/common/cmd/group.go | 2 +- pkg/common/cmd/msg.go | 2 +- pkg/common/cmd/push.go | 2 +- pkg/common/cmd/third.go | 2 +- pkg/common/cmd/user.go | 2 +- pkg/common/config/config.go | 17 +++++++------- .../discoveryregister/discoveryregister.go | 14 ++---------- pkg/common/prommetrics/rpc.go | 12 +++++----- pkg/common/startrpc/start.go | 4 ++-- tools/check-component/main.go | 4 ---- 30 files changed, 85 insertions(+), 103 deletions(-) diff --git a/config/discovery.yml b/config/discovery.yml index 6e68cbff4..fad49e3ed 100644 --- a/config/discovery.yml +++ b/config/discovery.yml @@ -5,4 +5,15 @@ etcd: username: '' password: '' +rpcService: + user: user-rpc-service + friend: friend-rpc-service + msg: msg-rpc-service + push: push-rpc-service + messageGateway: messageGateway-rpc-service + group: group-rpc-service + auth: auth-rpc-service + conversation: conversation-rpc-service + third: third-rpc-service + diff --git a/internal/api/init.go b/internal/api/init.go index e83dfc2ea..6c25609f6 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -51,7 +51,7 @@ func Start(ctx context.Context, index int, config *Config) error { var client discovery.SvcDiscoveryRegistry // Determine whether zk is passed according to whether it is a clustered deployment - client, err = kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share) + client, err = kdisc.NewDiscoveryRegister(&config.Discovery) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } diff --git a/internal/api/router.go b/internal/api/router.go index 8e4d17ef1..52d26bdc5 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -57,14 +57,14 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En _ = v.RegisterValidation("required_if", RequiredIf) } // init rpc client here - userRpc := rpcclient.NewUser(disCov, config.Share.RpcRegisterName.User, config.Share.RpcRegisterName.MessageGateway, + userRpc := rpcclient.NewUser(disCov, config.Discovery.RpcService.User, config.Discovery.RpcService.MessageGateway, config.Share.IMAdminUserID) - groupRpc := rpcclient.NewGroup(disCov, config.Share.RpcRegisterName.Group) - friendRpc := rpcclient.NewFriend(disCov, config.Share.RpcRegisterName.Friend) - messageRpc := rpcclient.NewMessage(disCov, config.Share.RpcRegisterName.Msg) - conversationRpc := rpcclient.NewConversation(disCov, config.Share.RpcRegisterName.Conversation) - authRpc := rpcclient.NewAuth(disCov, config.Share.RpcRegisterName.Auth) - thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.API.Prometheus.GrafanaURL) + groupRpc := rpcclient.NewGroup(disCov, config.Discovery.RpcService.Group) + friendRpc := rpcclient.NewFriend(disCov, config.Discovery.RpcService.Friend) + messageRpc := rpcclient.NewMessage(disCov, config.Discovery.RpcService.Msg) + conversationRpc := rpcclient.NewConversation(disCov, config.Discovery.RpcService.Conversation) + authRpc := rpcclient.NewAuth(disCov, config.Discovery.RpcService.Auth) + thirdRpc := rpcclient.NewThird(disCov, config.Discovery.RpcService.Third, config.API.Prometheus.GrafanaURL) switch config.API.Api.CompressionLevel { case NoCompression: case DefaultCompression: diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index b6760ed0f..8e1edbec7 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -37,7 +37,7 @@ import ( func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error { s.LongConnServer.SetDiscoveryRegistry(disCov, config) msggateway.RegisterMsgGatewayServer(server, s) - s.userRcp = rpcclient.NewUserRpcClient(disCov, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) + s.userRcp = rpcclient.NewUserRpcClient(disCov, config.Discovery.RpcService.User, config.Share.IMAdminUserID) if s.ready != nil { return s.ready(s) } @@ -48,7 +48,7 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error { return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP, conf.MsgGateway.RPC.RegisterIP, conf.MsgGateway.RPC.AutoSetPorts, conf.MsgGateway.RPC.Ports, index, - conf.Share.RpcRegisterName.MessageGateway, + conf.Discovery.RpcService.MessageGateway, &conf.Share, conf, s.InitServer, diff --git a/internal/msggateway/message_handler.go b/internal/msggateway/message_handler.go index 5407ba90c..f2b0ce9da 100644 --- a/internal/msggateway/message_handler.go +++ b/internal/msggateway/message_handler.go @@ -120,7 +120,7 @@ type GrpcHandler struct { validate *validator.Validate } -func NewGrpcHandler(validate *validator.Validate, client discovery.SvcDiscoveryRegistry, rpcRegisterName *config.RpcRegisterName) *GrpcHandler { +func NewGrpcHandler(validate *validator.Validate, client discovery.SvcDiscoveryRegistry, rpcRegisterName *config.RpcService) *GrpcHandler { msgRpcClient := rpcclient.NewMessageRpcClient(client, rpcRegisterName.Msg) pushRpcClient := rpcclient.NewPushRpcClient(client, rpcRegisterName.Push) return &GrpcHandler{ diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index e6b4f3fa4..335556ae9 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -72,9 +72,9 @@ type kickHandler struct { } func (ws *WsServer) SetDiscoveryRegistry(disCov discovery.SvcDiscoveryRegistry, config *Config) { - ws.MessageHandler = NewGrpcHandler(ws.validate, disCov, &config.Share.RpcRegisterName) - u := rpcclient.NewUserRpcClient(disCov, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) - ws.authClient = rpcclient.NewAuth(disCov, config.Share.RpcRegisterName.Auth) + ws.MessageHandler = NewGrpcHandler(ws.validate, disCov, &config.Discovery.RpcService) + u := rpcclient.NewUserRpcClient(disCov, config.Discovery.RpcService.User, config.Share.IMAdminUserID) + ws.authClient = rpcclient.NewAuth(disCov, config.Discovery.RpcService.Auth) ws.userClient = &u ws.disCov = disCov } @@ -113,7 +113,7 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer { for _, o := range opts { o(&config) } - //userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) + //userRpcClient := rpcclient.NewUserRpcClient(client, config.Discovery.RpcService.User, config.Share.IMAdminUserID) v := validator.New() return &WsServer{ @@ -192,7 +192,7 @@ func (ws *WsServer) Run(done chan error) error { var concurrentRequest = 3 func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *Client) error { - conns, err := ws.disCov.GetConns(ctx, ws.msgGatewayConfig.Share.RpcRegisterName.MessageGateway) + conns, err := ws.disCov.GetConns(ctx, ws.msgGatewayConfig.Discovery.RpcService.MessageGateway) if err != nil { return err } diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 92053931c..b65532a8e 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -75,7 +75,7 @@ func Start(ctx context.Context, index int, config *Config) error { if err != nil { return err } - client, err := discRegister.NewDiscoveryRegister(&config.Discovery, &config.Share) + client, err := discRegister.NewDiscoveryRegister(&config.Discovery) if err != nil { return err } @@ -101,8 +101,8 @@ func Start(ctx context.Context, index int, config *Config) error { if err != nil { return err } - conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) - groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) + conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Discovery.RpcService.Conversation) + groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Discovery.RpcService.Group) historyCH, err := NewOnlineHistoryRedisConsumerHandler(&config.KafkaConfig, msgTransferDatabase, &conversationRpcClient, &groupRpcClient) if err != nil { return err diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go index 9521a84a0..f1ff7fed5 100644 --- a/internal/push/onlinepusher.go +++ b/internal/push/onlinepusher.go @@ -60,7 +60,7 @@ func NewDefaultAllNode(disCov discovery.SvcDiscoveryRegistry, config *Config) *D func (d *DefaultAllNode) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { - conns, err := d.disCov.GetConns(ctx, d.config.Share.RpcRegisterName.MessageGateway) + conns, err := d.disCov.GetConns(ctx, d.config.Discovery.RpcService.MessageGateway) if len(conns) == 0 { log.ZWarn(ctx, "get gateway conn 0 ", nil) } else { diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index c1d1ac2f9..ee3dc5b84 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -58,14 +58,14 @@ func NewConsumerHandler(config *Config, database controller.PushDatabase, offlin return nil, err } - userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) + userRpcClient := rpcclient.NewUserRpcClient(client, config.Discovery.RpcService.User, config.Share.IMAdminUserID) consumerHandler.offlinePusher = offlinePusher consumerHandler.onlinePusher = NewOnlinePusher(client, config) - consumerHandler.groupRpcClient = rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) + consumerHandler.groupRpcClient = rpcclient.NewGroupRpcClient(client, config.Discovery.RpcService.Group) consumerHandler.groupLocalCache = rpccache.NewGroupLocalCache(consumerHandler.groupRpcClient, &config.LocalCacheConfig, rdb) - consumerHandler.msgRpcClient = rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) - consumerHandler.conversationRpcClient = rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) + consumerHandler.msgRpcClient = rpcclient.NewMessageRpcClient(client, config.Discovery.RpcService.Msg) + consumerHandler.conversationRpcClient = rpcclient.NewConversationRpcClient(client, config.Discovery.RpcService.Conversation) consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationRpcClient, &config.LocalCacheConfig, rdb) consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL) consumerHandler.config = config @@ -414,7 +414,7 @@ func (c *ConsumerHandler) DeleteMemberAndSetConversationSeq(ctx context.Context, if err != nil { return err } - + return c.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conversationID, maxSeq) } diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index f0f4a022d..c9cf8d345 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -59,7 +59,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } - userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) + userRpcClient := rpcclient.NewUserRpcClient(client, config.Discovery.RpcService.User, config.Share.IMAdminUserID) pbauth.RegisterAuthServer(server, &authServer{ userRpcClient: &userRpcClient, RegisterCenter: client, @@ -182,7 +182,7 @@ func (s *authServer) ForceLogout(ctx context.Context, req *pbauth.ForceLogoutReq } func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID int32) error { - conns, err := s.RegisterCenter.GetConns(ctx, s.config.Share.RpcRegisterName.MessageGateway) + conns, err := s.RegisterCenter.GetConns(ctx, s.config.Discovery.RpcService.MessageGateway) if err != nil { return err } diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index 6a8a6d800..7345c965d 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -78,9 +78,9 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } - groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) - msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) - userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) + groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Discovery.RpcService.Group) + msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Discovery.RpcService.Msg) + userRpcClient := rpcclient.NewUserRpcClient(client, config.Discovery.RpcService.User, config.Share.IMAdminUserID) localcache.InitLocalCache(&config.LocalCacheConfig) pbconversation.RegisterConversationServer(server, &conversationServer{ msgRpcClient: &msgRpcClient, diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 9e610df0f..62020f980 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -99,9 +99,9 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } - userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) - msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) - conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) + userRpcClient := rpcclient.NewUserRpcClient(client, config.Discovery.RpcService.User, config.Share.IMAdminUserID) + msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Discovery.RpcService.Msg) + conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Discovery.RpcService.Conversation) var gs groupServer database := controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs)) gs.db = database diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 758a7cf19..b0cd771a4 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -89,10 +89,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg return err } msgModel := redis.NewMsgCache(rdb) - conversationClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) - userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) - groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) - friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend) + conversationClient := rpcclient.NewConversationRpcClient(client, config.Discovery.RpcService.Conversation) + userRpcClient := rpcclient.NewUserRpcClient(client, config.Discovery.RpcService.User, config.Share.IMAdminUserID) + groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Discovery.RpcService.Group) + friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Discovery.RpcService.Friend) seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB()) if err != nil { return err diff --git a/internal/rpc/relation/friend.go b/internal/rpc/relation/friend.go index 036c7aff5..617e31348 100644 --- a/internal/rpc/relation/friend.go +++ b/internal/rpc/relation/friend.go @@ -93,8 +93,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg } // Initialize RPC clients - userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) - msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) + userRpcClient := rpcclient.NewUserRpcClient(client, config.Discovery.RpcService.User, config.Share.IMAdminUserID) + msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Discovery.RpcService.Msg) // Initialize notification sender notificationSender := NewFriendNotificationSender( @@ -119,7 +119,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg userRpcClient: &userRpcClient, notificationSender: notificationSender, RegisterCenter: client, - conversationRpcClient: rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation), + conversationRpcClient: rpcclient.NewConversationRpcClient(client, config.Discovery.RpcService.Conversation), config: config, webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), queue: memamq.NewMemoryQueue(16, 1024*1024), diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index 4206a2d6f..75076ab74 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -101,7 +101,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg localcache.InitLocalCache(&config.LocalCacheConfig) third.RegisterThirdServer(server, &thirdServer{ thirdDatabase: controller.NewThirdDatabase(redis.NewThirdCache(rdb), logdb), - userRpcClient: rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID), + userRpcClient: rpcclient.NewUserRpcClient(client, config.Discovery.RpcService.User, config.Share.IMAdminUserID), s3dataBase: controller.NewS3Database(rdb, o, s3db), defaultExpire: time.Hour * 24 * 7, config: config, diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 2dfbb01df..ae02b997f 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -96,9 +96,9 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi } userCache := redis.NewUserCacheRedis(rdb, &config.LocalCacheConfig, userDB, redis.GetRocksCacheOptions()) database := controller.NewUserDatabase(userDB, userCache, mgocli.GetTx()) - friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend) - groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) - msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) + friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Discovery.RpcService.Friend) + groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Discovery.RpcService.Group) + msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Discovery.RpcService.Msg) localcache.InitLocalCache(&config.LocalCacheConfig) u := &userServer{ online: redis.NewUserOnline(rdb), diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 4f87036c7..3864c43dc 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -1,17 +1,3 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package tools import ( @@ -47,24 +33,24 @@ func Start(ctx context.Context, config *CronTaskConfig) error { if config.CronTask.RetainChatRecords < 1 { return errs.New("msg destruct time must be greater than 1").Wrap() } - client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share) + client, err := kdisc.NewDiscoveryRegister(&config.Discovery) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0]) - msgConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg) + msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg) if err != nil { return err } - thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) + thirdConn, err := client.GetConn(ctx, config.Discovery.RpcService.Third) if err != nil { return err } - conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation) + conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation) if err != nil { return err } diff --git a/pkg/common/cmd/auth.go b/pkg/common/cmd/auth.go index 34f450fb1..80a675ace 100644 --- a/pkg/common/cmd/auth.go +++ b/pkg/common/cmd/auth.go @@ -56,5 +56,5 @@ func (a *AuthRpcCmd) Exec() error { func (a *AuthRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports, - a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig, auth.Start) + a.Index(), a.authConfig.Discovery.RpcService.Auth, &a.authConfig.Share, a.authConfig, auth.Start) } diff --git a/pkg/common/cmd/conversation.go b/pkg/common/cmd/conversation.go index f8ec570a7..72fa694ed 100644 --- a/pkg/common/cmd/conversation.go +++ b/pkg/common/cmd/conversation.go @@ -58,5 +58,5 @@ func (a *ConversationRpcCmd) Exec() error { func (a *ConversationRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports, - a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start) + a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start) } diff --git a/pkg/common/cmd/friend.go b/pkg/common/cmd/friend.go index 786be0d8e..b40449b2b 100644 --- a/pkg/common/cmd/friend.go +++ b/pkg/common/cmd/friend.go @@ -59,5 +59,5 @@ func (a *FriendRpcCmd) Exec() error { func (a *FriendRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports, - a.Index(), a.relationConfig.Share.RpcRegisterName.Friend, &a.relationConfig.Share, a.relationConfig, relation.Start) + a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.Share, a.relationConfig, relation.Start) } diff --git a/pkg/common/cmd/group.go b/pkg/common/cmd/group.go index e405c9135..faed56233 100644 --- a/pkg/common/cmd/group.go +++ b/pkg/common/cmd/group.go @@ -60,5 +60,5 @@ func (a *GroupRpcCmd) Exec() error { func (a *GroupRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports, - a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start, versionctx.EnableVersionCtx()) + a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.Share, a.groupConfig, group.Start, versionctx.EnableVersionCtx()) } diff --git a/pkg/common/cmd/msg.go b/pkg/common/cmd/msg.go index 3f1a2fa31..962d1468a 100644 --- a/pkg/common/cmd/msg.go +++ b/pkg/common/cmd/msg.go @@ -60,5 +60,5 @@ func (a *MsgRpcCmd) Exec() error { func (a *MsgRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports, - a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start) + a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start) } diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index e0f6b39f6..09883fc34 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -60,5 +60,5 @@ func (a *PushRpcCmd) Exec() error { func (a *PushRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports, - a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig, push.Start) + a.Index(), a.pushConfig.Discovery.RpcService.Push, &a.pushConfig.Share, a.pushConfig, push.Start) } diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go index c5a21f91b..8e9ffb9bd 100644 --- a/pkg/common/cmd/third.go +++ b/pkg/common/cmd/third.go @@ -59,5 +59,5 @@ func (a *ThirdRpcCmd) Exec() error { func (a *ThirdRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports, - a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start) + a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start) } diff --git a/pkg/common/cmd/user.go b/pkg/common/cmd/user.go index 1d916ea37..40f372770 100644 --- a/pkg/common/cmd/user.go +++ b/pkg/common/cmd/user.go @@ -60,5 +60,5 @@ func (a *UserRpcCmd) Exec() error { func (a *UserRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports, - a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, user.Start) + a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.Share, a.userConfig, user.Start) } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index ed803359c..6082b5f23 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -372,10 +372,9 @@ type AfterConfig struct { } type Share struct { - Secret string `mapstructure:"secret"` - RpcRegisterName RpcRegisterName `mapstructure:"rpcRegisterName"` - IMAdminUserID []string `mapstructure:"imAdminUserID"` - MultiLogin MultiLogin `mapstructure:"multiLogin"` + Secret string `mapstructure:"secret"` + IMAdminUserID []string `mapstructure:"imAdminUserID"` + MultiLogin MultiLogin `mapstructure:"multiLogin"` } type MultiLogin struct { @@ -383,7 +382,7 @@ type MultiLogin struct { MaxNumOneEnd int `mapstructure:"maxNumOneEnd"` } -type RpcRegisterName struct { +type RpcService struct { User string `mapstructure:"user"` Friend string `mapstructure:"friend"` Msg string `mapstructure:"msg"` @@ -395,7 +394,7 @@ type RpcRegisterName struct { Third string `mapstructure:"third"` } -func (r *RpcRegisterName) GetServiceNames() []string { +func (r *RpcService) GetServiceNames() []string { return []string{ r.User, r.Friend, @@ -470,9 +469,9 @@ type ZooKeeper struct { } type Discovery struct { - Enable string `mapstructure:"enable"` - Etcd Etcd `mapstructure:"etcd"` - ZooKeeper ZooKeeper `mapstructure:"zooKeeper"` + Enable string `mapstructure:"enable"` + Etcd Etcd `mapstructure:"etcd"` + RpcService RpcService `mapstructure:"rpcService"` } type Etcd struct { diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 559c937c1..be82aa526 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -19,25 +19,15 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/etcd" - "github.com/openimsdk/tools/discovery/zookeeper" "github.com/openimsdk/tools/errs" "time" ) // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. -func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share) (discovery.SvcDiscoveryRegistry, error) { +func NewDiscoveryRegister(discovery *config.Discovery) (discovery.SvcDiscoveryRegistry, error) { switch discovery.Enable { - case "zookeeper": - return zookeeper.NewZkClient( - discovery.ZooKeeper.Address, - discovery.ZooKeeper.Schema, - zookeeper.WithFreq(time.Hour), - zookeeper.WithUserNameAndPassword(discovery.ZooKeeper.Username, discovery.ZooKeeper.Password), - zookeeper.WithRoundRobin(), - zookeeper.WithTimeout(10), - ) case "k8s": - return kubernetes.NewK8sDiscoveryRegister(share.RpcRegisterName.MessageGateway) + return kubernetes.NewK8sDiscoveryRegister(discovery.RpcService.MessageGateway) case "etcd": return etcd.NewSvcDiscoveryRegistry( discovery.Etcd.RootDirectory, diff --git a/pkg/common/prommetrics/rpc.go b/pkg/common/prommetrics/rpc.go index 7162fa7e8..809d509b2 100644 --- a/pkg/common/prommetrics/rpc.go +++ b/pkg/common/prommetrics/rpc.go @@ -42,25 +42,25 @@ func GetGrpcServerMetrics() *gp.ServerMetrics { return grpcMetrics } -func GetGrpcCusMetrics(registerName string, share *config.Share) []prometheus.Collector { +func GetGrpcCusMetrics(registerName string, discovery *config.Discovery) []prometheus.Collector { switch registerName { - case share.RpcRegisterName.MessageGateway: + case discovery.RpcService.MessageGateway: return []prometheus.Collector{OnlineUserGauge} - case share.RpcRegisterName.Msg: + case discovery.RpcService.Msg: return []prometheus.Collector{ SingleChatMsgProcessSuccessCounter, SingleChatMsgProcessFailedCounter, GroupChatMsgProcessSuccessCounter, GroupChatMsgProcessFailedCounter, } - case share.RpcRegisterName.Push: + case discovery.RpcService.Push: return []prometheus.Collector{ MsgOfflinePushFailedCounter, MsgLoneTimePushCounter, } - case share.RpcRegisterName.Auth: + case discovery.RpcService.Auth: return []prometheus.Collector{UserLoginCounter} - case share.RpcRegisterName.User: + case discovery.RpcService.User: return []prometheus.Collector{UserRegisterCounter} default: return nil diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 26fbb0ffa..7671b1736 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -77,7 +77,7 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo if err != nil { return err } - cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) + cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, discovery) go func() { if err := prommetrics.RpcInit(cs, prometheusPort); err != nil && !errors.Is(err, http.ErrServerClosed) { netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort)) @@ -111,7 +111,7 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo "prometheusPorts", prometheusConfig.Ports) defer listener.Close() - client, err := kdisc.NewDiscoveryRegister(discovery, share) + client, err := kdisc.NewDiscoveryRegister(discovery) if err != nil { return err } diff --git a/tools/check-component/main.go b/tools/check-component/main.go index 94dbd613c..a84b7bd87 100644 --- a/tools/check-component/main.go +++ b/tools/check-component/main.go @@ -160,10 +160,6 @@ func performChecks(ctx context.Context, mongoConfig *config.Mongo, redisConfig * checks["Etcd"] = func(ctx context.Context) error { return CheckEtcd(ctx, &discovery.Etcd) } - } else if discovery.Enable == "zookeeper" { - checks["Zookeeper"] = func(ctx context.Context) error { - return CheckZookeeper(ctx, &discovery.ZooKeeper) - } } for i := 0; i < maxRetry; i++ {