diff --git a/internal/api/route.go b/internal/api/route.go index 5b5e99d31..0cdae0fc7 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -56,7 +56,7 @@ func Start(config *config.GlobalConfig, port int, proPort int) error { err := "port or proPort is empty:" + strconv.Itoa(port) + "," + strconv.Itoa(proPort) return errs.Wrap(fmt.Errorf(err)) } - rdb, err := cache.NewRedis(config) + rdb, err := cache.NewRedis(&config.Redis) if err != nil { return err } @@ -139,13 +139,13 @@ func newGinRouter(disCov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive } r.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID()) // init rpc client here - userRpc := rpcclient.NewUser(disCov, config) - groupRpc := rpcclient.NewGroup(disCov, config) - friendRpc := rpcclient.NewFriend(disCov, config) - messageRpc := rpcclient.NewMessage(disCov, config) - conversationRpc := rpcclient.NewConversation(disCov, config) - authRpc := rpcclient.NewAuth(disCov, config) - thirdRpc := rpcclient.NewThird(disCov, config) + userRpc := rpcclient.NewUser(disCov, config.RpcRegisterName.OpenImUserName, config.RpcRegisterName.OpenImMessageGatewayName) + groupRpc := rpcclient.NewGroup(disCov, config.RpcRegisterName.OpenImGroupName) + friendRpc := rpcclient.NewFriend(disCov, config.RpcRegisterName.OpenImFriendName) + messageRpc := rpcclient.NewMessage(disCov, config.RpcRegisterName.OpenImMsgName) + conversationRpc := rpcclient.NewConversation(disCov, config.RpcRegisterName.OpenImConversationName) + authRpc := rpcclient.NewAuth(disCov, config.RpcRegisterName.OpenImAuthName) + thirdRpc := rpcclient.NewThird(disCov, config.RpcRegisterName.OpenImThirdName, config.Prometheus.GrafanaUrl) u := NewUserApi(*userRpc) m := NewMessageApi(messageRpc, userRpc) @@ -313,10 +313,9 @@ func newGinRouter(disCov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive func GinParseToken(rdb redis.UniversalClient, config *config.GlobalConfig) gin.HandlerFunc { dataBase := controller.NewAuthDatabase( - cache.NewMsgCacheModel(rdb, config), + cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis), config.Secret, config.TokenPolicy.Expire, - config, ) return func(c *gin.Context) { switch c.Request.Method { diff --git a/internal/api/third.go b/internal/api/third.go index fd9ac146d..4558656cf 100644 --- a/internal/api/third.go +++ b/internal/api/third.go @@ -127,5 +127,5 @@ func (o *ThirdApi) SearchLogs(c *gin.Context) { } func (o *ThirdApi) GetPrometheus(c *gin.Context) { - c.Redirect(http.StatusFound, o.Config.Prometheus.GrafanaUrl) + c.Redirect(http.StatusFound, o.GrafanaUrl) } diff --git a/internal/api/user.go b/internal/api/user.go index 468432ee0..3b2e4d24f 100644 --- a/internal/api/user.go +++ b/internal/api/user.go @@ -70,7 +70,7 @@ func (u *UserApi) GetUsersOnlineStatus(c *gin.Context) { apiresp.GinError(c, err) return } - conns, err := u.Discov.GetConns(c, u.Config.RpcRegisterName.OpenImMessageGatewayName) + conns, err := u.Discov.GetConns(c, u.MessageGateWayRpcName) if err != nil { apiresp.GinError(c, err) return @@ -134,7 +134,7 @@ func (u *UserApi) GetUsersOnlineTokenDetail(c *gin.Context) { apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) return } - conns, err := u.Discov.GetConns(c, u.Config.RpcRegisterName.OpenImMessageGatewayName) + conns, err := u.Discov.GetConns(c, u.MessageGateWayRpcName) if err != nil { apiresp.GinError(c, err) return diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index f3949d223..ed18c64ea 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -32,12 +32,12 @@ import ( ) func (s *Server) InitServer(config *config.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis(config) + rdb, err := cache.NewRedis(&config.Redis) if err != nil { return err } - msgModel := cache.NewMsgCacheModel(rdb, config) + msgModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis) s.LongConnServer.SetDiscoveryRegistry(disCov, config) s.LongConnServer.SetCacheHandler(msgModel) msggateway.RegisterMsgGatewayServer(server, s) diff --git a/internal/msggateway/message_handler.go b/internal/msggateway/message_handler.go index 2b5deae25..5d4ff3d50 100644 --- a/internal/msggateway/message_handler.go +++ b/internal/msggateway/message_handler.go @@ -107,9 +107,9 @@ type GrpcHandler struct { validate *validator.Validate } -func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *GrpcHandler { - msgRpcClient := rpcclient.NewMessageRpcClient(client, config) - pushRpcClient := rpcclient.NewPushRpcClient(client, config) +func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName *config.RpcRegisterName) *GrpcHandler { + msgRpcClient := rpcclient.NewMessageRpcClient(client, rpcRegisterName.OpenImMsgName) + pushRpcClient := rpcclient.NewPushRpcClient(client, rpcRegisterName.OpenImPushName) return &GrpcHandler{ msgRpcClient: &msgRpcClient, pushClient: &pushRpcClient, validate: validate, diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 38ead43f0..1fd651f22 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -95,8 +95,8 @@ type kickHandler struct { } func (ws *WsServer) SetDiscoveryRegistry(disCov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) { - ws.MessageHandler = NewGrpcHandler(ws.validate, disCov, config) - u := rpcclient.NewUserRpcClient(disCov, config) + ws.MessageHandler = NewGrpcHandler(ws.validate, disCov, &config.RpcRegisterName) + u := rpcclient.NewUserRpcClient(disCov, config.RpcRegisterName.OpenImUserName) ws.userClient = &u ws.disCov = disCov } diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 5cd3bc364..188433c30 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -55,7 +55,7 @@ type MsgTransfer struct { } func StartTransfer(config *config.GlobalConfig, prometheusPort int) error { - rdb, err := cache.NewRedis(config) + rdb, err := cache.NewRedis(&config.Redis) if err != nil { return err } diff --git a/pkg/common/db/cache/init_redis.go b/pkg/common/db/cache/init_redis.go index f01220b33..e56992c1e 100644 --- a/pkg/common/db/cache/init_redis.go +++ b/pkg/common/db/cache/init_redis.go @@ -39,32 +39,32 @@ const ( ) // NewRedis Initialize redis connection. -func NewRedis(config *config.GlobalConfig) (redis.UniversalClient, error) { +func NewRedis(redisConf *config.Redis) (redis.UniversalClient, error) { if redisClient != nil { return redisClient, nil } // Read configuration from environment variables - overrideConfigFromEnv(config) + overrideConfigFromEnv(redisConf) - if len(config.Redis.Address) == 0 { + if len(redisConf.Address) == 0 { return nil, errs.Wrap(errors.New("redis address is empty")) } specialerror.AddReplace(redis.Nil, errs.ErrRecordNotFound) var rdb redis.UniversalClient - if len(config.Redis.Address) > 1 || config.Redis.ClusterMode { + if len(redisConf.Address) > 1 || redisConf.ClusterMode { rdb = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: config.Redis.Address, - Username: config.Redis.Username, - Password: config.Redis.Password, // no password set + Addrs: redisConf.Address, + Username: redisConf.Username, + Password: redisConf.Password, // no password set PoolSize: 50, MaxRetries: maxRetry, }) } else { rdb = redis.NewClient(&redis.Options{ - Addr: config.Redis.Address[0], - Username: config.Redis.Username, - Password: config.Redis.Password, + Addr: redisConf.Address[0], + Username: redisConf.Username, + Password: redisConf.Password, DB: 0, // use default DB PoolSize: 100, // connection pool size MaxRetries: maxRetry, @@ -76,8 +76,8 @@ func NewRedis(config *config.GlobalConfig) (redis.UniversalClient, error) { defer cancel() err = rdb.Ping(ctx).Err() if err != nil { - errMsg := fmt.Sprintf("address:%s, username:%s, password:%s, clusterMode:%t, enablePipeline:%t", config.Redis.Address, config.Redis.Username, - config.Redis.Password, config.Redis.ClusterMode, config.Redis.EnablePipeline) + errMsg := fmt.Sprintf("address:%s, username:%s, password:%s, clusterMode:%t, enablePipeline:%t", redisConf.Address, redisConf.Username, + redisConf.Password, redisConf.ClusterMode, redisConf.EnablePipeline) return nil, errs.Wrap(err, errMsg) } redisClient = rdb @@ -85,24 +85,24 @@ func NewRedis(config *config.GlobalConfig) (redis.UniversalClient, error) { } // overrideConfigFromEnv overrides configuration fields with environment variables if present. -func overrideConfigFromEnv(config *config.GlobalConfig) { +func overrideConfigFromEnv(redis *config.Redis) { if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" { if envPort := os.Getenv("REDIS_PORT"); envPort != "" { addresses := strings.Split(envAddr, ",") for i, addr := range addresses { addresses[i] = addr + ":" + envPort } - config.Redis.Address = addresses + redis.Address = addresses } else { - config.Redis.Address = strings.Split(envAddr, ",") + redis.Address = strings.Split(envAddr, ",") } } if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" { - config.Redis.Username = envUser + redis.Username = envUser } if envPass := os.Getenv("REDIS_PASSWORD"); envPass != "" { - config.Redis.Password = envPass + redis.Password = envPass } } diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 364a67937..bf64a73c6 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -122,14 +122,15 @@ type MsgModel interface { UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error } -func NewMsgCacheModel(client redis.UniversalClient, config *config.GlobalConfig) MsgModel { - return &msgCache{rdb: client, config: config} +func NewMsgCacheModel(client redis.UniversalClient, msgCacheTimeout int, redisConf *config.Redis) MsgModel { + return &msgCache{rdb: client, msgCacheTimeout: msgCacheTimeout, redisConf: redisConf} } type msgCache struct { metaCache - rdb redis.UniversalClient - config *config.GlobalConfig + rdb redis.UniversalClient + msgCacheTimeout int + redisConf *config.Redis } func (c *msgCache) getMaxSeqKey(conversationID string) string { @@ -317,7 +318,7 @@ func (c *msgCache) allMessageCacheKey(conversationID string) string { } func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { - if c.config.Redis.EnablePipeline { + if c.redisConf.EnablePipeline { return c.PipeGetMessagesBySeq(ctx, conversationID, seqs) } @@ -418,7 +419,7 @@ func (c *msgCache) ParallelGetMessagesBySeq(ctx context.Context, conversationID } func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) { - if c.config.Redis.EnablePipeline { + if c.redisConf.EnablePipeline { return c.PipeSetMessageToCache(ctx, conversationID, msgs) } return c.ParallelSetMessageToCache(ctx, conversationID, msgs) @@ -433,7 +434,7 @@ func (c *msgCache) PipeSetMessageToCache(ctx context.Context, conversationID str } key := c.getMessageCacheKey(conversationID, msg.Seq) - _ = pipe.Set(ctx, key, s, time.Duration(c.config.MsgCacheTimeout)*time.Second) + _ = pipe.Set(ctx, key, s, time.Duration(c.msgCacheTimeout)*time.Second) } results, err := pipe.Exec(ctx) @@ -463,7 +464,7 @@ func (c *msgCache) ParallelSetMessageToCache(ctx context.Context, conversationID } key := c.getMessageCacheKey(conversationID, msg.Seq) - if err := c.rdb.Set(ctx, key, s, time.Duration(c.config.MsgCacheTimeout)*time.Second).Err(); err != nil { + if err := c.rdb.Set(ctx, key, s, time.Duration(c.msgCacheTimeout)*time.Second).Err(); err != nil { return errs.Wrap(err) } return nil @@ -498,10 +499,10 @@ func (c *msgCache) UserDeleteMsgs(ctx context.Context, conversationID string, se if err != nil { return errs.Wrap(err) } - if err := c.rdb.Expire(ctx, delUserListKey, time.Duration(c.config.MsgCacheTimeout)*time.Second).Err(); err != nil { + if err := c.rdb.Expire(ctx, delUserListKey, time.Duration(c.msgCacheTimeout)*time.Second).Err(); err != nil { return errs.Wrap(err) } - if err := c.rdb.Expire(ctx, userDelListKey, time.Duration(c.config.MsgCacheTimeout)*time.Second).Err(); err != nil { + if err := c.rdb.Expire(ctx, userDelListKey, time.Duration(c.msgCacheTimeout)*time.Second).Err(); err != nil { return errs.Wrap(err) } } @@ -606,7 +607,7 @@ func (c *msgCache) DelUserDeleteMsgsList(ctx context.Context, conversationID str } func (c *msgCache) DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error { - if c.config.Redis.EnablePipeline { + if c.redisConf.EnablePipeline { return c.PipeDeleteMessages(ctx, conversationID, seqs) } @@ -688,7 +689,7 @@ func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []in if err != nil { return errs.Wrap(err) } - if err := c.rdb.Set(ctx, key, s, time.Duration(c.config.MsgCacheTimeout)*time.Second).Err(); err != nil { + if err := c.rdb.Set(ctx, key, s, time.Duration(c.msgCacheTimeout)*time.Second).Err(); err != nil { return errs.Wrap(err) } } diff --git a/pkg/common/db/controller/auth.go b/pkg/common/db/controller/auth.go index fbdd77833..a1f83d710 100644 --- a/pkg/common/db/controller/auth.go +++ b/pkg/common/db/controller/auth.go @@ -23,7 +23,6 @@ import ( "github.com/golang-jwt/jwt/v4" "github.com/openimsdk/open-im-server/v3/pkg/authverify" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" ) @@ -38,11 +37,10 @@ type authDatabase struct { cache cache.MsgModel accessSecret string accessExpire int64 - config *config.GlobalConfig } -func NewAuthDatabase(cache cache.MsgModel, accessSecret string, accessExpire int64, config *config.GlobalConfig) AuthDatabase { - return &authDatabase{cache: cache, accessSecret: accessSecret, accessExpire: accessExpire, config: config} +func NewAuthDatabase(cache cache.MsgModel, accessSecret string, accessExpire int64) AuthDatabase { + return &authDatabase{cache: cache, accessSecret: accessSecret, accessExpire: accessExpire} } // If the result is empty. @@ -58,7 +56,7 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI } var deleteTokenKey []string for k, v := range tokens { - _, err = tokenverify.GetClaimFromToken(k, authverify.Secret(a.config.Secret)) + _, err = tokenverify.GetClaimFromToken(k, authverify.Secret(a.accessSecret)) if err != nil || v != constant.NormalToken { deleteTokenKey = append(deleteTokenKey, k) } diff --git a/pkg/common/discoveryregister/direct/directconn.go b/pkg/common/discoveryregister/direct/directconn.go index 7c2df0000..538b106c1 100644 --- a/pkg/common/discoveryregister/direct/directconn.go +++ b/pkg/common/discoveryregister/direct/directconn.go @@ -28,17 +28,18 @@ import ( type ServiceAddresses map[string][]int -func getServiceAddresses(config *config2.GlobalConfig) ServiceAddresses { +func getServiceAddresses(rpcRegisterName *config2.RpcRegisterName, + rpcPort *config2.RpcPort, longConnSvrPort []int) ServiceAddresses { return ServiceAddresses{ - config.RpcRegisterName.OpenImUserName: config.RpcPort.OpenImUserPort, - config.RpcRegisterName.OpenImFriendName: config.RpcPort.OpenImFriendPort, - config.RpcRegisterName.OpenImMsgName: config.RpcPort.OpenImMessagePort, - config.RpcRegisterName.OpenImMessageGatewayName: config.LongConnSvr.OpenImMessageGatewayPort, - config.RpcRegisterName.OpenImGroupName: config.RpcPort.OpenImGroupPort, - config.RpcRegisterName.OpenImAuthName: config.RpcPort.OpenImAuthPort, - config.RpcRegisterName.OpenImPushName: config.RpcPort.OpenImPushPort, - config.RpcRegisterName.OpenImConversationName: config.RpcPort.OpenImConversationPort, - config.RpcRegisterName.OpenImThirdName: config.RpcPort.OpenImThirdPort, + rpcRegisterName.OpenImUserName: rpcPort.OpenImUserPort, + rpcRegisterName.OpenImFriendName: rpcPort.OpenImFriendPort, + rpcRegisterName.OpenImMsgName: rpcPort.OpenImMessagePort, + rpcRegisterName.OpenImMessageGatewayName: longConnSvrPort, + rpcRegisterName.OpenImGroupName: rpcPort.OpenImGroupPort, + rpcRegisterName.OpenImAuthName: rpcPort.OpenImAuthPort, + rpcRegisterName.OpenImPushName: rpcPort.OpenImPushPort, + rpcRegisterName.OpenImConversationName: rpcPort.OpenImConversationPort, + rpcRegisterName.OpenImThirdName: rpcPort.OpenImThirdPort, } } @@ -96,7 +97,8 @@ func (cd *ConnDirect) GetConns(ctx context.Context, if conns, exists := cd.conns[serviceName]; exists { return conns, nil } - ports := getServiceAddresses(cd.config)[serviceName] + ports := getServiceAddresses(&cd.config.RpcRegisterName, + &cd.config.RpcPort, cd.config.LongConnSvr.OpenImMessageGatewayPort)[serviceName] var connections []*grpc.ClientConn for _, port := range ports { conn, err := cd.dialServiceWithoutResolver(ctx, fmt.Sprintf(cd.config.Rpc.ListenIP+":%d", port), append(cd.additionalOpts, opts...)...) @@ -114,7 +116,8 @@ func (cd *ConnDirect) GetConns(ctx context.Context, func (cd *ConnDirect) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { // Get service addresses - addresses := getServiceAddresses(cd.config) + addresses := getServiceAddresses(&cd.config.RpcRegisterName, + &cd.config.RpcPort, cd.config.LongConnSvr.OpenImMessageGatewayPort) address, ok := addresses[serviceName] if !ok { return nil, errs.Wrap(errors.New("unknown service name"), "serviceName", serviceName) diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 948629311..2d1bee25e 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -36,7 +36,7 @@ func NewDiscoveryRegister(config *config.GlobalConfig) (discoveryregistry.SvcDis switch config.Envs.Discovery { case "zookeeper": - return zookeeper.NewZookeeperDiscoveryRegister(config) + return zookeeper.NewZookeeperDiscoveryRegister(&config.Zookeeper) case "k8s": return kubernetes.NewK8sDiscoveryRegister(config.RpcRegisterName.OpenImMessageGatewayName) case "direct": diff --git a/pkg/common/discoveryregister/zookeeper/zookeeper.go b/pkg/common/discoveryregister/zookeeper/zookeeper.go index 08ccad1c5..58bd7013f 100644 --- a/pkg/common/discoveryregister/zookeeper/zookeeper.go +++ b/pkg/common/discoveryregister/zookeeper/zookeeper.go @@ -29,11 +29,11 @@ import ( ) // NewZookeeperDiscoveryRegister creates a new instance of ZookeeperDR for Zookeeper service discovery and registration. -func NewZookeeperDiscoveryRegister(config *config.GlobalConfig) (discoveryregistry.SvcDiscoveryRegistry, error) { - schema := getEnv("ZOOKEEPER_SCHEMA", config.Zookeeper.Schema) - zkAddr := getZkAddrFromEnv(config.Zookeeper.ZkAddr) - username := getEnv("ZOOKEEPER_USERNAME", config.Zookeeper.Username) - password := getEnv("ZOOKEEPER_PASSWORD", config.Zookeeper.Password) +func NewZookeeperDiscoveryRegister(zkConf *config.Zookeeper) (discoveryregistry.SvcDiscoveryRegistry, error) { + schema := getEnv("ZOOKEEPER_SCHEMA", zkConf.Schema) + zkAddr := getZkAddrFromEnv(zkConf.ZkAddr) + username := getEnv("ZOOKEEPER_USERNAME", zkConf.Username) + password := getEnv("ZOOKEEPER_PASSWORD", zkConf.Password) zk, err := openkeeper.NewClient( zkAddr, @@ -47,10 +47,10 @@ func NewZookeeperDiscoveryRegister(config *config.GlobalConfig) (discoveryregist if err != nil { uriFormat := "address:%s, username:%s, password:%s, schema:%s." errInfo := fmt.Sprintf(uriFormat, - config.Zookeeper.ZkAddr, - config.Zookeeper.Username, - config.Zookeeper.Password, - config.Zookeeper.Schema) + zkConf.ZkAddr, + zkConf.Username, + zkConf.Password, + zkConf.Schema) return nil, errs.Wrap(err, errInfo) } return zk, nil diff --git a/pkg/rpcclient/auth.go b/pkg/rpcclient/auth.go index 4d9776b39..3894043ec 100644 --- a/pkg/rpcclient/auth.go +++ b/pkg/rpcclient/auth.go @@ -21,22 +21,20 @@ import ( "github.com/OpenIMSDK/tools/discoveryregistry" "google.golang.org/grpc" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) -func NewAuth(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Auth { - conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImAuthName) +func NewAuth(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) *Auth { + conn, err := discov.GetConn(context.Background(), rpcRegisterName) if err != nil { util.ExitWithError(err) } client := auth.NewAuthClient(conn) - return &Auth{discov: discov, conn: conn, Client: client, Config: config} + return &Auth{discov: discov, conn: conn, Client: client} } type Auth struct { conn grpc.ClientConnInterface Client auth.AuthClient discov discoveryregistry.SvcDiscoveryRegistry - Config *config.GlobalConfig } diff --git a/pkg/rpcclient/conversation.go b/pkg/rpcclient/conversation.go index 23bc91e08..4546bd521 100644 --- a/pkg/rpcclient/conversation.go +++ b/pkg/rpcclient/conversation.go @@ -34,19 +34,19 @@ type Conversation struct { Config *config.GlobalConfig } -func NewConversation(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Conversation { - conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImConversationName) +func NewConversation(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) *Conversation { + conn, err := discov.GetConn(context.Background(), rpcRegisterName) if err != nil { util.ExitWithError(err) } client := pbconversation.NewConversationClient(conn) - return &Conversation{discov: discov, conn: conn, Client: client, Config: config} + return &Conversation{discov: discov, conn: conn, Client: client} } type ConversationRpcClient Conversation -func NewConversationRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) ConversationRpcClient { - return ConversationRpcClient(*NewConversation(discov, config)) +func NewConversationRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) ConversationRpcClient { + return ConversationRpcClient(*NewConversation(discov, rpcRegisterName)) } func (c *ConversationRpcClient) GetSingleConversationRecvMsgOpt(ctx context.Context, userID, conversationID string) (int32, error) { diff --git a/pkg/rpcclient/friend.go b/pkg/rpcclient/friend.go index 94c7fe9d1..647999493 100644 --- a/pkg/rpcclient/friend.go +++ b/pkg/rpcclient/friend.go @@ -22,7 +22,6 @@ import ( "github.com/OpenIMSDK/tools/discoveryregistry" "google.golang.org/grpc" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) @@ -30,22 +29,21 @@ type Friend struct { conn grpc.ClientConnInterface Client friend.FriendClient discov discoveryregistry.SvcDiscoveryRegistry - Config *config.GlobalConfig } -func NewFriend(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Friend { - conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImFriendName) +func NewFriend(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) *Friend { + conn, err := discov.GetConn(context.Background(), rpcRegisterName) if err != nil { util.ExitWithError(err) } client := friend.NewFriendClient(conn) - return &Friend{discov: discov, conn: conn, Client: client, Config: config} + return &Friend{discov: discov, conn: conn, Client: client} } type FriendRpcClient Friend -func NewFriendRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) FriendRpcClient { - return FriendRpcClient(*NewFriend(discov, config)) +func NewFriendRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) FriendRpcClient { + return FriendRpcClient(*NewFriend(discov, rpcRegisterName)) } func (f *FriendRpcClient) GetFriendsInfo( diff --git a/pkg/rpcclient/group.go b/pkg/rpcclient/group.go index 8d9cfc372..50d39c9d3 100644 --- a/pkg/rpcclient/group.go +++ b/pkg/rpcclient/group.go @@ -25,29 +25,27 @@ import ( "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/utils" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) type Group struct { Client group.GroupClient discov discoveryregistry.SvcDiscoveryRegistry - Config *config.GlobalConfig } -func NewGroup(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Group { - conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImGroupName) +func NewGroup(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) *Group { + conn, err := discov.GetConn(context.Background(), rpcRegisterName) if err != nil { util.ExitWithError(err) } client := group.NewGroupClient(conn) - return &Group{discov: discov, Client: client, Config: config} + return &Group{discov: discov, Client: client} } type GroupRpcClient Group -func NewGroupRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) GroupRpcClient { - return GroupRpcClient(*NewGroup(discov, config)) +func NewGroupRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) GroupRpcClient { + return GroupRpcClient(*NewGroup(discov, rpcRegisterName)) } func (g *GroupRpcClient) GetGroupInfos( diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index 188fdda58..e3a6ed926 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -131,22 +131,21 @@ type Message struct { conn grpc.ClientConnInterface Client msg.MsgClient discov discoveryregistry.SvcDiscoveryRegistry - Config *config.GlobalConfig } -func NewMessage(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Message { - conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImMsgName) +func NewMessage(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) *Message { + conn, err := discov.GetConn(context.Background(), rpcRegisterName) if err != nil { util.ExitWithError(err) } client := msg.NewMsgClient(conn) - return &Message{discov: discov, conn: conn, Client: client, Config: config} + return &Message{discov: discov, conn: conn, Client: client} } type MessageRpcClient Message -func NewMessageRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) MessageRpcClient { - return MessageRpcClient(*NewMessage(discov, config)) +func NewMessageRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) MessageRpcClient { + return MessageRpcClient(*NewMessage(discov, rpcRegisterName)) } // SendMsg sends a message through the gRPC client and returns the response. diff --git a/pkg/rpcclient/push.go b/pkg/rpcclient/push.go index 277ccafb8..3e7c7194a 100644 --- a/pkg/rpcclient/push.go +++ b/pkg/rpcclient/push.go @@ -21,7 +21,6 @@ import ( "github.com/OpenIMSDK/tools/discoveryregistry" "google.golang.org/grpc" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) @@ -31,8 +30,8 @@ type Push struct { discov discoveryregistry.SvcDiscoveryRegistry } -func NewPush(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Push { - conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImPushName) +func NewPush(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) *Push { + conn, err := discov.GetConn(context.Background(), rpcRegisterName) if err != nil { util.ExitWithError(err) } @@ -45,8 +44,8 @@ func NewPush(discov discoveryregistry.SvcDiscoveryRegistry, config *config.Globa type PushRpcClient Push -func NewPushRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) PushRpcClient { - return PushRpcClient(*NewPush(discov, config)) +func NewPushRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) PushRpcClient { + return PushRpcClient(*NewPush(discov, rpcRegisterName)) } func (p *PushRpcClient) DelUserPushToken(ctx context.Context, req *push.DelUserPushTokenReq) (*push.DelUserPushTokenResp, error) { diff --git a/pkg/rpcclient/third.go b/pkg/rpcclient/third.go index 5e419f477..aa4e3f732 100644 --- a/pkg/rpcclient/third.go +++ b/pkg/rpcclient/third.go @@ -16,59 +16,28 @@ package rpcclient import ( "context" - "net/url" - "github.com/OpenIMSDK/protocol/third" "github.com/OpenIMSDK/tools/discoveryregistry" - "github.com/OpenIMSDK/tools/errs" - "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" "google.golang.org/grpc" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) type Third struct { - conn grpc.ClientConnInterface - Client third.ThirdClient - discov discoveryregistry.SvcDiscoveryRegistry - MinioClient *minio.Client - Config *config.GlobalConfig + conn grpc.ClientConnInterface + Client third.ThirdClient + discov discoveryregistry.SvcDiscoveryRegistry + GrafanaUrl string } -func NewThird(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Third { - conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImThirdName) +func NewThird(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName, grafanaUrl string) *Third { + conn, err := discov.GetConn(context.Background(), rpcRegisterName) if err != nil { util.ExitWithError(err) } client := third.NewThirdClient(conn) - minioClient, err := minioInit(config) if err != nil { util.ExitWithError(err) } - return &Third{discov: discov, Client: client, conn: conn, MinioClient: minioClient, Config: config} -} - -func minioInit(config *config.GlobalConfig) (*minio.Client, error) { - minioClient := &minio.Client{} - initUrl := config.Object.Minio.Endpoint - minioUrl, err := url.Parse(initUrl) - if err != nil { - return nil, errs.Wrap(err, "minioInit: failed to parse MinIO endpoint URL") - } - opts := &minio.Options{ - Creds: credentials.NewStaticV4(config.Object.Minio.AccessKeyID, config.Object.Minio.SecretAccessKey, ""), - // Region: config.Credential.Minio.Location, - } - if minioUrl.Scheme == "http" { - opts.Secure = false - } else if minioUrl.Scheme == "https" { - opts.Secure = true - } - minioClient, err = minio.New(minioUrl.Host, opts) - if err != nil { - return nil, errs.Wrap(err, "minioInit: failed to create MinIO client") - } - return minioClient, nil + return &Third{discov: discov, Client: client, conn: conn, GrafanaUrl: grafanaUrl} } diff --git a/pkg/rpcclient/user.go b/pkg/rpcclient/user.go index 0cd55bda1..5154f6f73 100644 --- a/pkg/rpcclient/user.go +++ b/pkg/rpcclient/user.go @@ -25,27 +25,25 @@ import ( "github.com/OpenIMSDK/tools/utils" "google.golang.org/grpc" - "github.com/openimsdk/open-im-server/v3/pkg/authverify" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) // User represents a structure holding connection details for the User RPC client. type User struct { - conn grpc.ClientConnInterface - Client user.UserClient - Discov discoveryregistry.SvcDiscoveryRegistry - Config *config.GlobalConfig + conn grpc.ClientConnInterface + Client user.UserClient + Discov discoveryregistry.SvcDiscoveryRegistry + MessageGateWayRpcName string } // NewUser initializes and returns a User instance based on the provided service discovery registry. -func NewUser(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *User { - conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImUserName) +func NewUser(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName, messageGateWayRpcName string) *User { + conn, err := discov.GetConn(context.Background(), rpcRegisterName) if err != nil { util.ExitWithError(err) } client := user.NewUserClient(conn) - return &User{Discov: discov, Client: client, conn: conn, Config: config} + return &User{Discov: discov, Client: client, conn: conn, MessageGateWayRpcName: messageGateWayRpcName} } // UserRpcClient represents the structure for a User RPC client. @@ -58,8 +56,8 @@ func NewUserRpcClientByUser(user *User) *UserRpcClient { } // NewUserRpcClient initializes a UserRpcClient based on the provided service discovery registry. -func NewUserRpcClient(client discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) UserRpcClient { - return UserRpcClient(*NewUser(client, config)) +func NewUserRpcClient(client discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) UserRpcClient { + return UserRpcClient(*NewUser(client, rpcRegisterName, "")) } // GetUsersInfo retrieves information for multiple users based on their user IDs. @@ -156,15 +154,6 @@ func (u *UserRpcClient) GetUserGlobalMsgRecvOpt(ctx context.Context, userID stri return resp.GlobalRecvMsgOpt, nil } -// Access verifies the access rights for the provided user ID. -func (u *UserRpcClient) Access(ctx context.Context, ownerUserID string) error { - _, err := u.GetUserInfo(ctx, ownerUserID) - if err != nil { - return err - } - return authverify.CheckAccessV3(ctx, ownerUserID, u.Config) -} - // GetAllUserIDs retrieves all user IDs with pagination options. func (u *UserRpcClient) GetAllUserIDs(ctx context.Context, pageNumber, showNumber int32) ([]string, error) { resp, err := u.Client.GetAllUserID(ctx, &user.GetAllUserIDReq{Pagination: &sdkws.RequestPagination{PageNumber: pageNumber, ShowNumber: showNumber}})