diff --git a/internal/api/route.go b/internal/api/route.go index bacbf3f53..304187ce4 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -17,7 +17,7 @@ package api import ( "context" "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/network" @@ -53,7 +53,17 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -func Start(ctx context.Context, index int, config *cmd.ApiConfig) error { +type Config struct { + RpcConfig config.API + RedisConfig config.Redis + MongodbConfig config.Mongo + ZookeeperConfig config.ZooKeeper + NotificationConfig config.Notification + Share config.Share + MinioConfig config.Minio +} + +func Start(ctx context.Context, index int, config *Config) error { apiPort, err := datautil.GetElemByIndex(config.RpcConfig.Api.Ports, index) if err != nil { return err @@ -128,7 +138,7 @@ func Start(ctx context.Context, index int, config *cmd.ApiConfig) error { return nil } -func newGinRouter(disCov discovery.SvcDiscoveryRegistry, rdb redis.UniversalClient, config *cmd.ApiConfig) *gin.Engine { +func newGinRouter(disCov discovery.SvcDiscoveryRegistry, rdb redis.UniversalClient, config *Config) *gin.Engine { disCov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) gin.SetMode(gin.ReleaseMode) @@ -311,7 +321,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, rdb redis.UniversalClie return r } -func GinParseToken(rdb redis.UniversalClient, config *cmd.ApiConfig) gin.HandlerFunc { +func GinParseToken(rdb redis.UniversalClient, config *Config) gin.HandlerFunc { //todo TokenPolicy dataBase := controller.NewAuthDatabase( cache.NewTokenCacheModel(rdb), @@ -371,8 +381,9 @@ func GinParseToken(rdb redis.UniversalClient, config *cmd.ApiConfig) gin.Handler } // // handleGinError logs and returns an error response through Gin context. -// func handleGinError(c *gin.Context, logMessage string, errType errs.CodeError, detail string) { -// wrappedErr := errType.Wrap(detail) -// apiresp.GinError(c, wrappedErr) -// c.Abort() -// } +// +// func handleGinError(c *gin.Context, logMessage string, errType errs.CodeError, detail string) { +// wrappedErr := errType.Wrap(detail) +// apiresp.GinError(c, wrappedErr) +// c.Abort() +// } diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 1e00931e7..155bc4350 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -16,7 +16,6 @@ package msggateway import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/open-im-server/v3/pkg/authverify" @@ -32,7 +31,7 @@ import ( "google.golang.org/grpc" ) -func (s *Server) InitServer(ctx context.Context, config *cmd.MsgGatewayConfig, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error { +func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error { rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) if err != nil { return err @@ -45,7 +44,7 @@ func (s *Server) InitServer(ctx context.Context, config *cmd.MsgGatewayConfig, d return nil } -func (s *Server) Start(ctx context.Context, index int, conf *cmd.MsgGatewayConfig) error { +func (s *Server) Start(ctx context.Context, index int, conf *Config) error { return startrpc.Start(ctx, &conf.ZookeeperConfig, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP, conf.MsgGateway.RPC.RegisterIP, conf.MsgGateway.RPC.Ports, index, @@ -59,7 +58,7 @@ type Server struct { rpcPort int prometheusPort int LongConnServer LongConnServer - config *cmd.MsgGatewayConfig + config *Config pushTerminal map[int]struct{} } @@ -67,7 +66,7 @@ func (s *Server) SetLongConnServer(LongConnServer LongConnServer) { s.LongConnServer = LongConnServer } -func NewServer(rpcPort int, proPort int, longConnServer LongConnServer, conf *cmd.MsgGatewayConfig) *Server { +func NewServer(rpcPort int, proPort int, longConnServer LongConnServer, conf *Config) *Server { s := &Server{ rpcPort: rpcPort, prometheusPort: proPort, diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 6377d258c..0e704706d 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -16,15 +16,23 @@ package msggateway import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/utils/datautil" "time" "github.com/openimsdk/tools/log" ) +type Config struct { + MsgGateway config.MsgGateway + RedisConfig config.Redis + ZookeeperConfig config.ZooKeeper + Share config.Share + WebhooksConfig config.Webhooks +} + // Start run ws server. -func Start(ctx context.Context, index int, conf *cmd.MsgGatewayConfig) error { +func Start(ctx context.Context, index int, conf *Config) error { log.CInfo(ctx, "MSG-GATEWAY server is initializing", "rpcPorts", conf.MsgGateway.RPC.Ports, "wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index) diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 12b0c7764..fc018904d 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "net/http" "strconv" "sync" @@ -49,7 +48,7 @@ type LongConnServer interface { GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool) Validate(s any) error SetCacheHandler(cache cache.TokenModel) - SetDiscoveryRegistry(client discovery.SvcDiscoveryRegistry, config *cmd.MsgGatewayConfig) + SetDiscoveryRegistry(client discovery.SvcDiscoveryRegistry, config *Config) KickUserConn(client *Client) error UnRegister(c *Client) SetKickHandlerInfo(i *kickHandler) @@ -59,7 +58,7 @@ type LongConnServer interface { } type WsServer struct { - msgGatewayConfig *cmd.MsgGatewayConfig + msgGatewayConfig *Config port int wsMaxConnNum int64 registerChan chan *Client @@ -86,7 +85,7 @@ type kickHandler struct { newClient *Client } -func (ws *WsServer) SetDiscoveryRegistry(disCov discovery.SvcDiscoveryRegistry, config *cmd.MsgGatewayConfig) { +func (ws *WsServer) SetDiscoveryRegistry(disCov discovery.SvcDiscoveryRegistry, config *Config) { ws.MessageHandler = NewGrpcHandler(ws.validate, disCov, &config.Share.RpcRegisterName) u := rpcclient.NewUserRpcClient(disCov, config.Share.RpcRegisterName.User, &config.Share.IMAdmin) ws.userClient = &u @@ -132,7 +131,7 @@ func (ws *WsServer) GetUserPlatformCons(userID string, platform int) ([]*Client, return ws.clients.Get(userID, platform) } -func NewWsServer(msgGatewayConfig *cmd.MsgGatewayConfig, opts ...Option) (*WsServer, error) { +func NewWsServer(msgGatewayConfig *Config, opts ...Option) (*WsServer, error) { var config configs for _, o := range opts { o(&config) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index daad05881..d0b09ccc8 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -17,7 +17,6 @@ package msgtransfer import ( "context" "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/utils/datautil" @@ -56,7 +55,17 @@ type MsgTransfer struct { cancel context.CancelFunc } -func Start(ctx context.Context, index int, config *cmd.MsgTransferConfig) error { +type Config struct { + MsgTransfer config.MsgTransfer + RedisConfig config.Redis + MongodbConfig config.Mongo + KafkaConfig config.Kafka + ZookeeperConfig config.ZooKeeper + Share config.Share + WebhooksConfig config.Webhooks +} + +func Start(ctx context.Context, index int, config *Config) error { log.CInfo(ctx, "MSG-TRANSFER server is initializing", "prometheusPorts", config.MsgTransfer.Prometheus.Ports, "index", index) mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { @@ -112,7 +121,7 @@ func NewMsgTransfer(kafkaConf *config.Kafka, msgDatabase controller.CommonMsgDat }, nil } -func (m *MsgTransfer) Start(index int, config *cmd.MsgTransferConfig) error { +func (m *MsgTransfer) Start(index int, config *Config) error { prometheusPort, err := datautil.GetElemByIndex(config.MsgTransfer.Prometheus.Ports, index) if err != nil { return err diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index 1306455c8..27f739a69 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -16,7 +16,7 @@ package push import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" @@ -35,7 +35,18 @@ type pushServer struct { pusher *Pusher } -func Start(ctx context.Context, config *cmd.PushConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { +type Config struct { + RpcConfig config.Push + RedisConfig config.Redis + MongodbConfig config.Mongo + KafkaConfig config.Kafka + ZookeeperConfig config.ZooKeeper + NotificationConfig config.Notification + Share config.Share + WebhooksConfig config.Webhooks +} + +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) if err != nil { return err diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index f94c31cb2..c5a01b7c9 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -17,7 +17,6 @@ package push import ( "context" "encoding/json" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/errs" "sync" @@ -49,7 +48,7 @@ import ( ) type Pusher struct { - config *cmd.PushConfig + config *Config database controller.PushDatabase discov discovery.SvcDiscoveryRegistry offlinePusher offlinepush.OfflinePusher @@ -62,7 +61,7 @@ type Pusher struct { var errNoOfflinePusher = errs.New("no offlinePusher is configured") -func NewPusher(config *cmd.PushConfig, discov discovery.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, +func NewPusher(config *Config, discov discovery.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, groupLocalCache *rpccache.GroupLocalCache, conversationLocalCache *rpccache.ConversationLocalCache, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient, ) *Pusher { diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 2a91e548d..425594779 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -16,7 +16,7 @@ package auth import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/open-im-server/v3/pkg/authverify" @@ -40,10 +40,17 @@ type authServer struct { authDatabase controller.AuthDatabase userRpcClient *rpcclient.UserRpcClient RegisterCenter discovery.SvcDiscoveryRegistry - config *cmd.AuthConfig + config *Config } -func Start(ctx context.Context, config *cmd.AuthConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { +type Config struct { + RpcConfig config.Auth + RedisConfig config.Redis + ZookeeperConfig config.ZooKeeper + Share config.Share +} + +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) if err != nil { return err diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 74ecdd28c..6ee874d73 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -16,7 +16,7 @@ package conversation import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/db/redisutil" "sort" @@ -45,10 +45,19 @@ type conversationServer struct { groupRpcClient *rpcclient.GroupRpcClient conversationDatabase controller.ConversationDatabase conversationNotificationSender *notification.ConversationNotificationSender - config *cmd.ConversationConfig + config *Config } -func Start(ctx context.Context, config *cmd.ConversationConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { +type Config struct { + RpcConfig config.Conversation + RedisConfig config.Redis + MongodbConfig config.Mongo + ZookeeperConfig config.ZooKeeper + NotificationConfig config.Notification + Share config.Share +} + +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 1a7c17104..a19ed5ddb 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -16,7 +16,7 @@ package friend import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/open-im-server/v3/pkg/authverify" @@ -45,10 +45,20 @@ type friendServer struct { notificationSender *notification.FriendNotificationSender conversationRpcClient rpcclient.ConversationRpcClient RegisterCenter discovery.SvcDiscoveryRegistry - config *cmd.FriendConfig + config *Config } -func Start(ctx context.Context, config *cmd.FriendConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { +type Config struct { + RpcConfig config.Friend + RedisConfig config.Redis + MongodbConfig config.Mongo + ZookeeperConfig config.ZooKeeper + NotificationConfig config.Notification + Share config.Share + WebhooksConfig config.Webhooks +} + +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index cc58ff4ca..038910b2c 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -17,7 +17,7 @@ package group import ( "context" "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "math/big" "math/rand" "strconv" @@ -58,10 +58,20 @@ type groupServer struct { notification *notification.GroupNotificationSender conversationRpcClient rpcclient.ConversationRpcClient msgRpcClient rpcclient.MessageRpcClient - config *cmd.GroupConfig + config *Config } -func Start(ctx context.Context, config *cmd.GroupConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { +type Config struct { + RpcConfig config.Group + RedisConfig config.Redis + MongodbConfig config.Mongo + ZookeeperConfig config.ZooKeeper + NotificationConfig config.Notification + Share config.Share + WebhooksConfig config.Webhooks +} + +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err diff --git a/internal/rpc/msg/message_interceptor.go b/internal/rpc/msg/message_interceptor.go index 1c7094e73..0b0566029 100644 --- a/internal/rpc/msg/message_interceptor.go +++ b/internal/rpc/msg/message_interceptor.go @@ -16,7 +16,6 @@ package msg import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/protocol/constant" @@ -24,9 +23,9 @@ import ( "github.com/openimsdk/protocol/sdkws" ) -type MessageInterceptorFunc func(ctx context.Context, globalConfig *cmd.MsgConfig, req *msg.SendMsgReq) (*sdkws.MsgData, error) +type MessageInterceptorFunc func(ctx context.Context, globalConfig *Config, req *msg.SendMsgReq) (*sdkws.MsgData, error) -func MessageHasReadEnabled(ctx context.Context, config *cmd.MsgConfig, req *msg.SendMsgReq) (*sdkws.MsgData, error) { +func MessageHasReadEnabled(ctx context.Context, config *Config, req *msg.SendMsgReq) (*sdkws.MsgData, error) { switch { case req.MsgData.ContentType == constant.HasReadReceipt && req.MsgData.SessionType == constant.SingleChatType: if !config.RpcConfig.SingleMessageHasReadReceiptEnable { diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 58f21b18b..2af313b8f 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -16,7 +16,7 @@ package msg import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" @@ -47,7 +47,18 @@ type ( ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data. Handlers MessageInterceptorChain // Chain of handlers for processing messages. notificationSender *rpcclient.NotificationSender // RPC client for sending notifications. - config *cmd.MsgConfig // Global configuration settings. + config *Config // Global configuration settings. + } + + Config struct { + RpcConfig config.Msg + RedisConfig config.Redis + MongodbConfig config.Mongo + KafkaConfig config.Kafka + ZookeeperConfig config.ZooKeeper + NotificationConfig config.Notification + Share config.Share + WebhooksConfig config.Webhooks } ) @@ -56,7 +67,7 @@ func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorF } -func Start(ctx context.Context, config *cmd.MsgConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err diff --git a/internal/rpc/msg/utils.go b/internal/rpc/msg/utils.go index 9e5a656e0..15ebd6274 100644 --- a/internal/rpc/msg/utils.go +++ b/internal/rpc/msg/utils.go @@ -15,7 +15,6 @@ package msg import ( - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" @@ -23,7 +22,7 @@ import ( "go.mongodb.org/mongo-driver/mongo" ) -func isMessageHasReadEnabled(msgData *sdkws.MsgData, config *cmd.MsgConfig) bool { +func isMessageHasReadEnabled(msgData *sdkws.MsgData, config *Config) bool { switch { case msgData.ContentType == constant.HasReadReceipt && msgData.SessionType == constant.SingleChatType: if config.RpcConfig.SingleMessageHasReadReceiptEnable { diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index 4db71020a..6aa214a34 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -17,7 +17,7 @@ package third import ( "context" "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "net/url" "time" @@ -43,10 +43,19 @@ type thirdServer struct { s3dataBase controller.S3Database userRpcClient rpcclient.UserRpcClient defaultExpire time.Duration - config *cmd.ThirdConfig + config *Config +} +type Config struct { + RpcConfig config.Third + RedisConfig config.Redis + MongodbConfig config.Mongo + ZookeeperConfig config.ZooKeeper + NotificationConfig config.Notification + Share config.Share + MinioConfig config.Minio } -func Start(ctx context.Context, config *cmd.ThirdConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err @@ -80,11 +89,11 @@ func Start(ctx context.Context, config *cmd.ThirdConfig, client discovery.SvcDis var o s3.Interface switch enable { case "minio": - o, err = minio.NewMinio(cache.NewMinioCache(rdb), minio.Config(config.Object.Minio)) + o, err = minio.NewMinio(cache.NewMinioCache(rdb), *config.MinioConfig.Build()) case "cos": - o, err = cos.NewCos(cos.Config(config.Object.Cos)) + o, err = cos.NewCos(*config.RpcConfig.Object.Cos.Build()) case "oss": - o, err = oss.NewOSS(oss.Config(config.Object.Oss)) + o, err = oss.NewOSS(*config.RpcConfig.Object.Oss.Build()) default: err = fmt.Errorf("invalid object enable: %s", enable) } @@ -93,8 +102,8 @@ func Start(ctx context.Context, config *cmd.ThirdConfig, client discovery.SvcDis } third.RegisterThirdServer(server, &thirdServer{ apiURL: apiURL, - thirdDatabase: controller.NewThirdDatabase(cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis), logdb), - userRpcClient: rpcclient.NewUserRpcClient(client, config.RpcRegisterName.OpenImUserName, &config.Manager, &config.IMAdmin), + thirdDatabase: controller.NewThirdDatabase(cache.NewThirdCache(rdb), logdb), + userRpcClient: rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, &config.Share.IMAdmin), s3dataBase: controller.NewS3Database(rdb, o, s3db), defaultExpire: time.Hour * 24 * 7, config: config, diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 1c30df47a..c32ac0788 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -16,7 +16,7 @@ package user import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/db/redisutil" "math/rand" "strings" @@ -51,10 +51,21 @@ type userServer struct { friendRpcClient *rpcclient.FriendRpcClient groupRpcClient *rpcclient.GroupRpcClient RegisterCenter registry.SvcDiscoveryRegistry - config *cmd.UserConfig + config *Config } -func Start(ctx context.Context, config *cmd.UserConfig, client registry.SvcDiscoveryRegistry, server *grpc.Server) error { +type Config struct { + RpcConfig config.User + RedisConfig config.Redis + MongodbConfig config.Mongo + KafkaConfig config.Kafka + ZookeeperConfig config.ZooKeeper + NotificationConfig config.Notification + Share config.Share + WebhooksConfig config.Webhooks +} + +func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegistry, server *grpc.Server) error { mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index bc330af70..82ce95eda 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -16,7 +16,7 @@ package tools import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/db/redisutil" "os" "os/signal" @@ -29,7 +29,16 @@ import ( "github.com/robfig/cron/v3" ) -func Start(ctx context.Context, config *cmd.CronTaskConfig) error { +type CronTaskConfig struct { + CronTask config.CronTask + RedisConfig config.Redis + MongodbConfig config.Mongo + ZookeeperConfig config.ZooKeeper + Share config.Share + KafkaConfig config.Kafka +} + +func Start(ctx context.Context, config *CronTaskConfig) error { log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime, "msgDestructTime", config.CronTask.MsgDestructTime) @@ -94,7 +103,7 @@ func netlock(rdb redis.UniversalClient, key string, ttl time.Duration) bool { return ok } -func cronWrapFunc(config *cmd.CronTaskConfig, rdb redis.UniversalClient, key string, fn func()) func() { +func cronWrapFunc(config *CronTaskConfig, rdb redis.UniversalClient, key string, fn func()) func() { enableCronLocker := config.CronTask.EnableCronLocker return func() { // if don't enable cron-locker, call fn directly. diff --git a/internal/tools/msg.go b/internal/tools/msg.go index f2bbc57d9..32ad227a6 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -17,7 +17,6 @@ package tools import ( "context" "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/db/redisutil" "math" "math/rand" @@ -47,12 +46,12 @@ type MsgTool struct { userDatabase controller.UserDatabase groupDatabase controller.GroupDatabase msgNotificationSender *notification.MsgNotificationSender - config *cmd.CronTaskConfig + config *CronTaskConfig } func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controller.UserDatabase, groupDatabase controller.GroupDatabase, conversationDatabase controller.ConversationDatabase, - msgNotificationSender *notification.MsgNotificationSender, config *cmd.CronTaskConfig, + msgNotificationSender *notification.MsgNotificationSender, config *CronTaskConfig, ) *MsgTool { return &MsgTool{ msgDatabase: msgDatabase, @@ -64,7 +63,7 @@ func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controlle } } -func InitMsgTool(ctx context.Context, config *cmd.CronTaskConfig) (*MsgTool, error) { +func InitMsgTool(ctx context.Context, config *CronTaskConfig) (*MsgTool, error) { mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return nil, err diff --git a/pkg/common/cmd/api.go b/pkg/common/cmd/api.go index b94fe1a79..e25758c59 100644 --- a/pkg/common/cmd/api.go +++ b/pkg/common/cmd/api.go @@ -26,20 +26,11 @@ type ApiCmd struct { *RootCmd ctx context.Context configMap map[string]StructEnvPrefix - apiConfig ApiConfig -} -type ApiConfig struct { - RpcConfig config.API - RedisConfig config.Redis - MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper - NotificationConfig config.Notification - Share config.Share - MinioConfig config.Minio + apiConfig api.Config } func NewApiCmd() *ApiCmd { - var apiConfig ApiConfig + var apiConfig api.Config ret := &ApiCmd{apiConfig: apiConfig} ret.configMap = map[string]StructEnvPrefix{ OpenIMAPICfgFileName: {EnvPrefix: apiEnvPrefix, ConfigStruct: &apiConfig.RpcConfig}, diff --git a/pkg/common/cmd/auth.go b/pkg/common/cmd/auth.go index fbf01deb9..c7b4d88e3 100644 --- a/pkg/common/cmd/auth.go +++ b/pkg/common/cmd/auth.go @@ -27,17 +27,11 @@ type AuthRpcCmd struct { *RootCmd ctx context.Context configMap map[string]StructEnvPrefix - authConfig AuthConfig -} -type AuthConfig struct { - RpcConfig config.Auth - RedisConfig config.Redis - ZookeeperConfig config.ZooKeeper - Share config.Share + authConfig auth.Config } func NewAuthRpcCmd() *AuthRpcCmd { - var authConfig AuthConfig + var authConfig auth.Config ret := &AuthRpcCmd{authConfig: authConfig} ret.configMap = map[string]StructEnvPrefix{ OpenIMRPCAuthCfgFileName: {EnvPrefix: authEnvPrefix, ConfigStruct: &authConfig.RpcConfig}, diff --git a/pkg/common/cmd/conversation.go b/pkg/common/cmd/conversation.go index f6d242b15..eeed10f56 100644 --- a/pkg/common/cmd/conversation.go +++ b/pkg/common/cmd/conversation.go @@ -27,19 +27,11 @@ type ConversationRpcCmd struct { *RootCmd ctx context.Context configMap map[string]StructEnvPrefix - conversationConfig ConversationConfig -} -type ConversationConfig struct { - RpcConfig config.Conversation - RedisConfig config.Redis - MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper - NotificationConfig config.Notification - Share config.Share + conversationConfig conversation.Config } func NewConversationRpcCmd() *ConversationRpcCmd { - var conversationConfig ConversationConfig + var conversationConfig conversation.Config ret := &ConversationRpcCmd{conversationConfig: conversationConfig} ret.configMap = map[string]StructEnvPrefix{ OpenIMRPCConversationCfgFileName: {EnvPrefix: conversationEnvPrefix, ConfigStruct: &conversationConfig.RpcConfig}, diff --git a/pkg/common/cmd/cron_task.go b/pkg/common/cmd/cron_task.go index c0a6710ec..185e60f7e 100644 --- a/pkg/common/cmd/cron_task.go +++ b/pkg/common/cmd/cron_task.go @@ -26,19 +26,11 @@ type CronTaskCmd struct { *RootCmd ctx context.Context configMap map[string]StructEnvPrefix - cronTaskConfig CronTaskConfig -} -type CronTaskConfig struct { - CronTask config.CronTask - RedisConfig config.Redis - MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper - Share config.Share - KafkaConfig config.Kafka + cronTaskConfig tools.CronTaskConfig } func NewCronTaskCmd() *CronTaskCmd { - var cronTaskConfig CronTaskConfig + var cronTaskConfig tools.CronTaskConfig ret := &CronTaskCmd{cronTaskConfig: cronTaskConfig} ret.configMap = map[string]StructEnvPrefix{ OpenIMCronTaskCfgFileName: {EnvPrefix: cornTaskEnvPrefix, ConfigStruct: &cronTaskConfig.CronTask}, diff --git a/pkg/common/cmd/friend.go b/pkg/common/cmd/friend.go index e22d4157b..d7f97552d 100644 --- a/pkg/common/cmd/friend.go +++ b/pkg/common/cmd/friend.go @@ -27,20 +27,11 @@ type FriendRpcCmd struct { *RootCmd ctx context.Context configMap map[string]StructEnvPrefix - friendConfig FriendConfig -} -type FriendConfig struct { - RpcConfig config.Friend - RedisConfig config.Redis - MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper - NotificationConfig config.Notification - Share config.Share - WebhooksConfig config.Webhooks + friendConfig friend.Config } func NewFriendRpcCmd() *FriendRpcCmd { - var friendConfig FriendConfig + var friendConfig friend.Config ret := &FriendRpcCmd{friendConfig: friendConfig} ret.configMap = map[string]StructEnvPrefix{ OpenIMRPCFriendCfgFileName: {EnvPrefix: friendEnvPrefix, ConfigStruct: &friendConfig.RpcConfig}, diff --git a/pkg/common/cmd/group.go b/pkg/common/cmd/group.go index 192fa23b1..047068233 100644 --- a/pkg/common/cmd/group.go +++ b/pkg/common/cmd/group.go @@ -27,20 +27,11 @@ type GroupRpcCmd struct { *RootCmd ctx context.Context configMap map[string]StructEnvPrefix - groupConfig GroupConfig -} -type GroupConfig struct { - RpcConfig config.Group - RedisConfig config.Redis - MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper - NotificationConfig config.Notification - Share config.Share - WebhooksConfig config.Webhooks + groupConfig group.Config } func NewGroupRpcCmd() *GroupRpcCmd { - var groupConfig GroupConfig + var groupConfig group.Config ret := &GroupRpcCmd{groupConfig: groupConfig} ret.configMap = map[string]StructEnvPrefix{ OpenIMRPCGroupCfgFileName: {EnvPrefix: groupEnvPrefix, ConfigStruct: &groupConfig.RpcConfig}, diff --git a/pkg/common/cmd/msg.go b/pkg/common/cmd/msg.go index e8592f214..d8b0ae8a8 100644 --- a/pkg/common/cmd/msg.go +++ b/pkg/common/cmd/msg.go @@ -27,21 +27,11 @@ type MsgRpcCmd struct { *RootCmd ctx context.Context configMap map[string]StructEnvPrefix - msgConfig MsgConfig -} -type MsgConfig struct { - RpcConfig config.Msg - RedisConfig config.Redis - MongodbConfig config.Mongo - KafkaConfig config.Kafka - ZookeeperConfig config.ZooKeeper - NotificationConfig config.Notification - Share config.Share - WebhooksConfig config.Webhooks + msgConfig msg.Config } func NewMsgRpcCmd() *MsgRpcCmd { - var msgConfig MsgConfig + var msgConfig msg.Config ret := &MsgRpcCmd{msgConfig: msgConfig} ret.configMap = map[string]StructEnvPrefix{ OpenIMRPCMsgCfgFileName: {EnvPrefix: msgEnvPrefix, ConfigStruct: &msgConfig.RpcConfig}, diff --git a/pkg/common/cmd/msg_gateway.go b/pkg/common/cmd/msg_gateway.go index 48c0328cc..05ab0466e 100644 --- a/pkg/common/cmd/msg_gateway.go +++ b/pkg/common/cmd/msg_gateway.go @@ -28,18 +28,11 @@ type MsgGatewayCmd struct { *RootCmd ctx context.Context configMap map[string]StructEnvPrefix - msgGatewayConfig MsgGatewayConfig -} -type MsgGatewayConfig struct { - MsgGateway config.MsgGateway - RedisConfig config.Redis - ZookeeperConfig config.ZooKeeper - Share config.Share - WebhooksConfig config.Webhooks + msgGatewayConfig msggateway.Config } func NewMsgGatewayCmd() *MsgGatewayCmd { - var msgGatewayConfig MsgGatewayConfig + var msgGatewayConfig msggateway.Config ret := &MsgGatewayCmd{msgGatewayConfig: msgGatewayConfig} ret.configMap = map[string]StructEnvPrefix{ OpenIMMsgGatewayCfgFileName: {EnvPrefix: msgGatewayEnvPrefix, ConfigStruct: &msgGatewayConfig.MsgGateway}, diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index 990ffdcd3..b4677f04b 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -26,20 +26,11 @@ type MsgTransferCmd struct { *RootCmd ctx context.Context configMap map[string]StructEnvPrefix - msgTransferConfig MsgTransferConfig -} -type MsgTransferConfig struct { - MsgTransfer config.MsgTransfer - RedisConfig config.Redis - MongodbConfig config.Mongo - KafkaConfig config.Kafka - ZookeeperConfig config.ZooKeeper - Share config.Share - WebhooksConfig config.Webhooks + msgTransferConfig msgtransfer.Config } func NewMsgTransferCmd() *MsgTransferCmd { - var msgTransferConfig MsgTransferConfig + var msgTransferConfig msgtransfer.Config ret := &MsgTransferCmd{msgTransferConfig: msgTransferConfig} ret.configMap = map[string]StructEnvPrefix{ OpenIMMsgTransferCfgFileName: {EnvPrefix: msgTransferEnvPrefix, ConfigStruct: &msgTransferConfig.MsgTransfer}, diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index eab29fa1c..1055b2876 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -27,21 +27,11 @@ type PushRpcCmd struct { *RootCmd ctx context.Context configMap map[string]StructEnvPrefix - pushConfig PushConfig -} -type PushConfig struct { - RpcConfig config.Push - RedisConfig config.Redis - MongodbConfig config.Mongo - KafkaConfig config.Kafka - ZookeeperConfig config.ZooKeeper - NotificationConfig config.Notification - Share config.Share - WebhooksConfig config.Webhooks + pushConfig push.Config } func NewPushRpcCmd() *PushRpcCmd { - var pushConfig PushConfig + var pushConfig push.Config ret := &PushRpcCmd{pushConfig: pushConfig} ret.configMap = map[string]StructEnvPrefix{ OpenIMPushCfgFileName: {EnvPrefix: pushEnvPrefix, ConfigStruct: &pushConfig.RpcConfig}, diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go index b6728d3c0..0225cbcb1 100644 --- a/pkg/common/cmd/third.go +++ b/pkg/common/cmd/third.go @@ -27,20 +27,11 @@ type ThirdRpcCmd struct { *RootCmd ctx context.Context configMap map[string]StructEnvPrefix - thirdConfig ThirdConfig -} -type ThirdConfig struct { - RpcConfig config.Third - RedisConfig config.Redis - MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper - NotificationConfig config.Notification - Share config.Share - MinioConfig config.Minio + thirdConfig third.Config } func NewThirdRpcCmd() *ThirdRpcCmd { - var thirdConfig ThirdConfig + var thirdConfig third.Config ret := &ThirdRpcCmd{thirdConfig: thirdConfig} ret.configMap = map[string]StructEnvPrefix{ OpenIMRPCThirdCfgFileName: {EnvPrefix: thridEnvPrefix, ConfigStruct: &thirdConfig.RpcConfig}, diff --git a/pkg/common/cmd/user.go b/pkg/common/cmd/user.go index eed5066c7..55b96ec1d 100644 --- a/pkg/common/cmd/user.go +++ b/pkg/common/cmd/user.go @@ -27,21 +27,11 @@ type UserRpcCmd struct { *RootCmd ctx context.Context configMap map[string]StructEnvPrefix - userConfig UserConfig -} -type UserConfig struct { - RpcConfig config.User - RedisConfig config.Redis - MongodbConfig config.Mongo - KafkaConfig config.Kafka - ZookeeperConfig config.ZooKeeper - NotificationConfig config.Notification - Share config.Share - WebhooksConfig config.Webhooks + userConfig user.Config } func NewUserRpcCmd() *UserRpcCmd { - var userConfig UserConfig + var userConfig user.Config ret := &UserRpcCmd{userConfig: userConfig} ret.configMap = map[string]StructEnvPrefix{ OpenIMRPCUserCfgFileName: {EnvPrefix: userEnvPrefix, ConfigStruct: &userConfig.RpcConfig}, diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index daf897aad..7ad815e86 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -15,10 +15,13 @@ package config import ( + "fmt" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/mq/kafka" + "github.com/openimsdk/tools/s3/cos" "github.com/openimsdk/tools/s3/minio" + "github.com/openimsdk/tools/s3/oss" "time" ) @@ -277,23 +280,9 @@ type Third struct { Prometheus Prometheus `mapstructure:"prometheus"` Object struct { Enable string `mapstructure:"enable"` - Cos struct { - BucketURL string `mapstructure:"bucketURL"` - SecretID string `mapstructure:"secretID"` - SecretKey string `mapstructure:"secretKey"` - SessionToken string `mapstructure:"sessionToken"` - PublicRead bool `mapstructure:"publicRead"` - } `mapstructure:"cos"` - Oss struct { - Endpoint string `mapstructure:"endpoint"` - Bucket string `mapstructure:"bucket"` - BucketURL string `mapstructure:"bucketURL"` - AccessKeyID string `mapstructure:"accessKeyID"` - AccessKeySecret string `mapstructure:"accessKeySecret"` - SessionToken string `mapstructure:"sessionToken"` - PublicRead bool `mapstructure:"publicRead"` - } `mapstructure:"oss"` - Kodo struct { + Cos Cos `mapstructure:"cos"` + Oss Oss `mapstructure:"oss"` + Kodo struct { Endpoint string `mapstructure:"endpoint"` Bucket string `mapstructure:"bucket"` BucketURL string `mapstructure:"bucketURL"` @@ -312,6 +301,22 @@ type Third struct { } `mapstructure:"aws"` } `mapstructure:"object"` } +type Cos struct { + BucketURL string `mapstructure:"bucketURL"` + SecretID string `mapstructure:"secretID"` + SecretKey string `mapstructure:"secretKey"` + SessionToken string `mapstructure:"sessionToken"` + PublicRead bool `mapstructure:"publicRead"` +} +type Oss struct { + Endpoint string `mapstructure:"endpoint"` + Bucket string `mapstructure:"bucket"` + BucketURL string `mapstructure:"bucketURL"` + AccessKeyID string `mapstructure:"accessKeyID"` + AccessKeySecret string `mapstructure:"accessKeySecret"` + SessionToken string `mapstructure:"sessionToken"` + PublicRead bool `mapstructure:"publicRead"` +} type User struct { RPC struct { @@ -474,15 +479,36 @@ func (k *Kafka) Build() *kafka.Config { func (m *Minio) Build() *minio.Config { return &minio.Config{ Bucket: m.Bucket, - Endpoint: "", + Endpoint: fmt.Sprintf("http://%s:%d", m.InternalIP, m.Port), AccessKeyID: m.AccessKeyID, SecretAccessKey: m.SecretAccessKey, SessionToken: m.SessionToken, - SignEndpoint: "", + SignEndpoint: fmt.Sprintf("http://%s:%d", m.ExternalIP, m.Port), PublicRead: m.PublicRead, } } +func (c *Cos) Build() *cos.Config { + return &cos.Config{ + BucketURL: c.BucketURL, + SecretID: c.SecretID, + SecretKey: c.SecretKey, + SessionToken: c.SessionToken, + PublicRead: c.PublicRead, + } +} + +func (o *Oss) Build() *oss.Config { + return &oss.Config{ + Endpoint: o.Endpoint, + Bucket: o.Bucket, + BucketURL: o.BucketURL, + AccessKeyID: o.AccessKeyID, + AccessKeySecret: o.AccessKeySecret, + SessionToken: o.SessionToken, + PublicRead: o.PublicRead, + } +} func (l *CacheConfig) Failed() time.Duration { return time.Second * time.Duration(l.FailedExpire) diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index e51597057..28914a21f 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -17,7 +17,7 @@ package controller import ( "context" "encoding/json" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + "github.com/openimsdk/open-im-server/v3/internal/tools" "time" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -133,7 +133,7 @@ func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, msg cache.M }, nil } -func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *cmd.CronTaskConfig) (CommonMsgDatabase, error) { +func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *tools.CronTaskConfig) (CommonMsgDatabase, error) { msgDocModel, err := mgo.NewMsgMongo(database) if err != nil { return nil, err diff --git a/pkg/common/db/controller/push.go b/pkg/common/db/controller/push.go index cf72027ee..390d70b7b 100644 --- a/pkg/common/db/controller/push.go +++ b/pkg/common/db/controller/push.go @@ -25,7 +25,7 @@ type PushDatabase interface { } type pushDataBase struct { - cache cache.MsgModel + cache cache.ThirdCache } func NewPushDatabase(cache cache.ThirdCache) PushDatabase { diff --git a/pkg/common/db/controller/third.go b/pkg/common/db/controller/third.go index 2e7940a0d..be618843f 100644 --- a/pkg/common/db/controller/third.go +++ b/pkg/common/db/controller/third.go @@ -34,7 +34,7 @@ type ThirdDatabase interface { } type thirdDatabase struct { - cache cache.MsgModel + cache cache.ThirdCache logdb relation.LogInterface } @@ -58,7 +58,7 @@ func (t *thirdDatabase) UploadLogs(ctx context.Context, logs []*relation.LogMode return t.logdb.Create(ctx, logs) } -func NewThirdDatabase(cache cache.MsgModel, logdb relation.LogInterface) ThirdDatabase { +func NewThirdDatabase(cache cache.ThirdCache, logdb relation.LogInterface) ThirdDatabase { return &thirdDatabase{cache: cache, logdb: logdb} } diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 7da7c5f33..962e46ef3 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -19,39 +19,35 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/direct" "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes" "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/discovery/zookeeper" "github.com/openimsdk/tools/errs" "time" ) const ( - zookeeper = "zoopkeeper" - kubenetes = "k8s" - - directT = "direct" + zookeeperConst = "zoopkeeper" + kubenetesConst = "k8s" + directConst = "direct" ) // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. -func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper) (discovery.SvcDiscoveryRegistry, error) { +func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper, env string) (discovery.SvcDiscoveryRegistry, error) { - switch zookeeperConfig.Env { - case "zookeeper": - return zookeeper.NewZookeeperDiscoveryRegister(&config.Zookeeper) - zk, err := zookeeper.NewZkClient( - zookeeperConfig.zkAddr, - schema, + switch env { + case zookeeperConst: + return zookeeper.NewZkClient( + zookeeperConfig.Address, + zookeeperConfig.Schema, zookeeper.WithFreq(time.Hour), - zookeeper.WithUserNameAndPassword(username, password), + zookeeper.WithUserNameAndPassword(zookeeperConfig.Username, zookeeperConfig.Password), zookeeper.WithRoundRobin(), zookeeper.WithTimeout(10), ) - if err != nil { - return nil, err - } - case "k8s": + case kubenetesConst: return kubernetes.NewK8sDiscoveryRegister(config.RpcRegisterName.OpenImMessageGatewayName) - case "direct": + case directConst: return direct.NewConnDirect(config) default: - return nil, errs.New("unsupported discovery type", "type", config.Envs.Discovery).Wrap() + return nil, errs.New("unsupported discovery type", "type", env).Wrap() } } diff --git a/pkg/rpcclient/conversation.go b/pkg/rpcclient/conversation.go index c750665a7..6eb6c9461 100644 --- a/pkg/rpcclient/conversation.go +++ b/pkg/rpcclient/conversation.go @@ -18,7 +18,6 @@ import ( "context" "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" @@ -30,7 +29,6 @@ type Conversation struct { Client pbconversation.ConversationClient conn grpc.ClientConnInterface discov discovery.SvcDiscoveryRegistry - Config *config.GlobalConfig } func NewConversation(discov discovery.SvcDiscoveryRegistry, rpcRegisterName string) *Conversation {