refactor: extract nested structures in the config.

pull/2100/head
Gordon 2 years ago
parent 8d3402982d
commit 1f563fde3a

@ -56,7 +56,7 @@ func Start(config *config.GlobalConfig, port int, proPort int) error {
err := "port or proPort is empty:" + strconv.Itoa(port) + "," + strconv.Itoa(proPort) err := "port or proPort is empty:" + strconv.Itoa(port) + "," + strconv.Itoa(proPort)
return errs.Wrap(fmt.Errorf(err)) return errs.Wrap(fmt.Errorf(err))
} }
rdb, err := cache.NewRedis(config) rdb, err := cache.NewRedis(&config.Redis)
if err != nil { if err != nil {
return err return err
} }
@ -139,13 +139,13 @@ func newGinRouter(disCov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
} }
r.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID()) r.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID())
// init rpc client here // init rpc client here
userRpc := rpcclient.NewUser(disCov, config) userRpc := rpcclient.NewUser(disCov, config.RpcRegisterName.OpenImUserName, config.RpcRegisterName.OpenImMessageGatewayName)
groupRpc := rpcclient.NewGroup(disCov, config) groupRpc := rpcclient.NewGroup(disCov, config.RpcRegisterName.OpenImGroupName)
friendRpc := rpcclient.NewFriend(disCov, config) friendRpc := rpcclient.NewFriend(disCov, config.RpcRegisterName.OpenImFriendName)
messageRpc := rpcclient.NewMessage(disCov, config) messageRpc := rpcclient.NewMessage(disCov, config.RpcRegisterName.OpenImMsgName)
conversationRpc := rpcclient.NewConversation(disCov, config) conversationRpc := rpcclient.NewConversation(disCov, config.RpcRegisterName.OpenImConversationName)
authRpc := rpcclient.NewAuth(disCov, config) authRpc := rpcclient.NewAuth(disCov, config.RpcRegisterName.OpenImAuthName)
thirdRpc := rpcclient.NewThird(disCov, config) thirdRpc := rpcclient.NewThird(disCov, config.RpcRegisterName.OpenImThirdName, config.Prometheus.GrafanaUrl)
u := NewUserApi(*userRpc) u := NewUserApi(*userRpc)
m := NewMessageApi(messageRpc, userRpc) m := NewMessageApi(messageRpc, userRpc)
@ -313,10 +313,9 @@ func newGinRouter(disCov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
func GinParseToken(rdb redis.UniversalClient, config *config.GlobalConfig) gin.HandlerFunc { func GinParseToken(rdb redis.UniversalClient, config *config.GlobalConfig) gin.HandlerFunc {
dataBase := controller.NewAuthDatabase( dataBase := controller.NewAuthDatabase(
cache.NewMsgCacheModel(rdb, config), cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis),
config.Secret, config.Secret,
config.TokenPolicy.Expire, config.TokenPolicy.Expire,
config,
) )
return func(c *gin.Context) { return func(c *gin.Context) {
switch c.Request.Method { switch c.Request.Method {

@ -127,5 +127,5 @@ func (o *ThirdApi) SearchLogs(c *gin.Context) {
} }
func (o *ThirdApi) GetPrometheus(c *gin.Context) { func (o *ThirdApi) GetPrometheus(c *gin.Context) {
c.Redirect(http.StatusFound, o.Config.Prometheus.GrafanaUrl) c.Redirect(http.StatusFound, o.GrafanaUrl)
} }

@ -70,7 +70,7 @@ func (u *UserApi) GetUsersOnlineStatus(c *gin.Context) {
apiresp.GinError(c, err) apiresp.GinError(c, err)
return return
} }
conns, err := u.Discov.GetConns(c, u.Config.RpcRegisterName.OpenImMessageGatewayName) conns, err := u.Discov.GetConns(c, u.MessageGateWayRpcName)
if err != nil { if err != nil {
apiresp.GinError(c, err) apiresp.GinError(c, err)
return return
@ -134,7 +134,7 @@ func (u *UserApi) GetUsersOnlineTokenDetail(c *gin.Context) {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return return
} }
conns, err := u.Discov.GetConns(c, u.Config.RpcRegisterName.OpenImMessageGatewayName) conns, err := u.Discov.GetConns(c, u.MessageGateWayRpcName)
if err != nil { if err != nil {
apiresp.GinError(c, err) apiresp.GinError(c, err)
return return

@ -32,12 +32,12 @@ import (
) )
func (s *Server) InitServer(config *config.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { func (s *Server) InitServer(config *config.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(config) rdb, err := cache.NewRedis(&config.Redis)
if err != nil { if err != nil {
return err return err
} }
msgModel := cache.NewMsgCacheModel(rdb, config) msgModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis)
s.LongConnServer.SetDiscoveryRegistry(disCov, config) s.LongConnServer.SetDiscoveryRegistry(disCov, config)
s.LongConnServer.SetCacheHandler(msgModel) s.LongConnServer.SetCacheHandler(msgModel)
msggateway.RegisterMsgGatewayServer(server, s) msggateway.RegisterMsgGatewayServer(server, s)

@ -107,9 +107,9 @@ type GrpcHandler struct {
validate *validator.Validate validate *validator.Validate
} }
func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *GrpcHandler { func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName *config.RpcRegisterName) *GrpcHandler {
msgRpcClient := rpcclient.NewMessageRpcClient(client, config) msgRpcClient := rpcclient.NewMessageRpcClient(client, rpcRegisterName.OpenImMsgName)
pushRpcClient := rpcclient.NewPushRpcClient(client, config) pushRpcClient := rpcclient.NewPushRpcClient(client, rpcRegisterName.OpenImPushName)
return &GrpcHandler{ return &GrpcHandler{
msgRpcClient: &msgRpcClient, msgRpcClient: &msgRpcClient,
pushClient: &pushRpcClient, validate: validate, pushClient: &pushRpcClient, validate: validate,

@ -95,8 +95,8 @@ type kickHandler struct {
} }
func (ws *WsServer) SetDiscoveryRegistry(disCov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) { func (ws *WsServer) SetDiscoveryRegistry(disCov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) {
ws.MessageHandler = NewGrpcHandler(ws.validate, disCov, config) ws.MessageHandler = NewGrpcHandler(ws.validate, disCov, &config.RpcRegisterName)
u := rpcclient.NewUserRpcClient(disCov, config) u := rpcclient.NewUserRpcClient(disCov, config.RpcRegisterName.OpenImUserName)
ws.userClient = &u ws.userClient = &u
ws.disCov = disCov ws.disCov = disCov
} }

@ -55,7 +55,7 @@ type MsgTransfer struct {
} }
func StartTransfer(config *config.GlobalConfig, prometheusPort int) error { func StartTransfer(config *config.GlobalConfig, prometheusPort int) error {
rdb, err := cache.NewRedis(config) rdb, err := cache.NewRedis(&config.Redis)
if err != nil { if err != nil {
return err return err
} }

@ -39,32 +39,32 @@ const (
) )
// NewRedis Initialize redis connection. // NewRedis Initialize redis connection.
func NewRedis(config *config.GlobalConfig) (redis.UniversalClient, error) { func NewRedis(redisConf *config.Redis) (redis.UniversalClient, error) {
if redisClient != nil { if redisClient != nil {
return redisClient, nil return redisClient, nil
} }
// Read configuration from environment variables // Read configuration from environment variables
overrideConfigFromEnv(config) overrideConfigFromEnv(redisConf)
if len(config.Redis.Address) == 0 { if len(redisConf.Address) == 0 {
return nil, errs.Wrap(errors.New("redis address is empty")) return nil, errs.Wrap(errors.New("redis address is empty"))
} }
specialerror.AddReplace(redis.Nil, errs.ErrRecordNotFound) specialerror.AddReplace(redis.Nil, errs.ErrRecordNotFound)
var rdb redis.UniversalClient var rdb redis.UniversalClient
if len(config.Redis.Address) > 1 || config.Redis.ClusterMode { if len(redisConf.Address) > 1 || redisConf.ClusterMode {
rdb = redis.NewClusterClient(&redis.ClusterOptions{ rdb = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: config.Redis.Address, Addrs: redisConf.Address,
Username: config.Redis.Username, Username: redisConf.Username,
Password: config.Redis.Password, // no password set Password: redisConf.Password, // no password set
PoolSize: 50, PoolSize: 50,
MaxRetries: maxRetry, MaxRetries: maxRetry,
}) })
} else { } else {
rdb = redis.NewClient(&redis.Options{ rdb = redis.NewClient(&redis.Options{
Addr: config.Redis.Address[0], Addr: redisConf.Address[0],
Username: config.Redis.Username, Username: redisConf.Username,
Password: config.Redis.Password, Password: redisConf.Password,
DB: 0, // use default DB DB: 0, // use default DB
PoolSize: 100, // connection pool size PoolSize: 100, // connection pool size
MaxRetries: maxRetry, MaxRetries: maxRetry,
@ -76,8 +76,8 @@ func NewRedis(config *config.GlobalConfig) (redis.UniversalClient, error) {
defer cancel() defer cancel()
err = rdb.Ping(ctx).Err() err = rdb.Ping(ctx).Err()
if err != nil { if err != nil {
errMsg := fmt.Sprintf("address:%s, username:%s, password:%s, clusterMode:%t, enablePipeline:%t", config.Redis.Address, config.Redis.Username, errMsg := fmt.Sprintf("address:%s, username:%s, password:%s, clusterMode:%t, enablePipeline:%t", redisConf.Address, redisConf.Username,
config.Redis.Password, config.Redis.ClusterMode, config.Redis.EnablePipeline) redisConf.Password, redisConf.ClusterMode, redisConf.EnablePipeline)
return nil, errs.Wrap(err, errMsg) return nil, errs.Wrap(err, errMsg)
} }
redisClient = rdb redisClient = rdb
@ -85,24 +85,24 @@ func NewRedis(config *config.GlobalConfig) (redis.UniversalClient, error) {
} }
// overrideConfigFromEnv overrides configuration fields with environment variables if present. // overrideConfigFromEnv overrides configuration fields with environment variables if present.
func overrideConfigFromEnv(config *config.GlobalConfig) { func overrideConfigFromEnv(redis *config.Redis) {
if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" { if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" {
if envPort := os.Getenv("REDIS_PORT"); envPort != "" { if envPort := os.Getenv("REDIS_PORT"); envPort != "" {
addresses := strings.Split(envAddr, ",") addresses := strings.Split(envAddr, ",")
for i, addr := range addresses { for i, addr := range addresses {
addresses[i] = addr + ":" + envPort addresses[i] = addr + ":" + envPort
} }
config.Redis.Address = addresses redis.Address = addresses
} else { } else {
config.Redis.Address = strings.Split(envAddr, ",") redis.Address = strings.Split(envAddr, ",")
} }
} }
if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" { if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" {
config.Redis.Username = envUser redis.Username = envUser
} }
if envPass := os.Getenv("REDIS_PASSWORD"); envPass != "" { if envPass := os.Getenv("REDIS_PASSWORD"); envPass != "" {
config.Redis.Password = envPass redis.Password = envPass
} }
} }

@ -122,14 +122,15 @@ type MsgModel interface {
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
} }
func NewMsgCacheModel(client redis.UniversalClient, config *config.GlobalConfig) MsgModel { func NewMsgCacheModel(client redis.UniversalClient, msgCacheTimeout int, redisConf *config.Redis) MsgModel {
return &msgCache{rdb: client, config: config} return &msgCache{rdb: client, msgCacheTimeout: msgCacheTimeout, redisConf: redisConf}
} }
type msgCache struct { type msgCache struct {
metaCache metaCache
rdb redis.UniversalClient rdb redis.UniversalClient
config *config.GlobalConfig msgCacheTimeout int
redisConf *config.Redis
} }
func (c *msgCache) getMaxSeqKey(conversationID string) string { func (c *msgCache) getMaxSeqKey(conversationID string) string {
@ -317,7 +318,7 @@ func (c *msgCache) allMessageCacheKey(conversationID string) string {
} }
func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
if c.config.Redis.EnablePipeline { if c.redisConf.EnablePipeline {
return c.PipeGetMessagesBySeq(ctx, conversationID, seqs) return c.PipeGetMessagesBySeq(ctx, conversationID, seqs)
} }
@ -418,7 +419,7 @@ func (c *msgCache) ParallelGetMessagesBySeq(ctx context.Context, conversationID
} }
func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) { func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
if c.config.Redis.EnablePipeline { if c.redisConf.EnablePipeline {
return c.PipeSetMessageToCache(ctx, conversationID, msgs) return c.PipeSetMessageToCache(ctx, conversationID, msgs)
} }
return c.ParallelSetMessageToCache(ctx, conversationID, msgs) return c.ParallelSetMessageToCache(ctx, conversationID, msgs)
@ -433,7 +434,7 @@ func (c *msgCache) PipeSetMessageToCache(ctx context.Context, conversationID str
} }
key := c.getMessageCacheKey(conversationID, msg.Seq) key := c.getMessageCacheKey(conversationID, msg.Seq)
_ = pipe.Set(ctx, key, s, time.Duration(c.config.MsgCacheTimeout)*time.Second) _ = pipe.Set(ctx, key, s, time.Duration(c.msgCacheTimeout)*time.Second)
} }
results, err := pipe.Exec(ctx) results, err := pipe.Exec(ctx)
@ -463,7 +464,7 @@ func (c *msgCache) ParallelSetMessageToCache(ctx context.Context, conversationID
} }
key := c.getMessageCacheKey(conversationID, msg.Seq) key := c.getMessageCacheKey(conversationID, msg.Seq)
if err := c.rdb.Set(ctx, key, s, time.Duration(c.config.MsgCacheTimeout)*time.Second).Err(); err != nil { if err := c.rdb.Set(ctx, key, s, time.Duration(c.msgCacheTimeout)*time.Second).Err(); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
return nil return nil
@ -498,10 +499,10 @@ func (c *msgCache) UserDeleteMsgs(ctx context.Context, conversationID string, se
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
if err := c.rdb.Expire(ctx, delUserListKey, time.Duration(c.config.MsgCacheTimeout)*time.Second).Err(); err != nil { if err := c.rdb.Expire(ctx, delUserListKey, time.Duration(c.msgCacheTimeout)*time.Second).Err(); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
if err := c.rdb.Expire(ctx, userDelListKey, time.Duration(c.config.MsgCacheTimeout)*time.Second).Err(); err != nil { if err := c.rdb.Expire(ctx, userDelListKey, time.Duration(c.msgCacheTimeout)*time.Second).Err(); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
} }
@ -606,7 +607,7 @@ func (c *msgCache) DelUserDeleteMsgsList(ctx context.Context, conversationID str
} }
func (c *msgCache) DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error { func (c *msgCache) DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error {
if c.config.Redis.EnablePipeline { if c.redisConf.EnablePipeline {
return c.PipeDeleteMessages(ctx, conversationID, seqs) return c.PipeDeleteMessages(ctx, conversationID, seqs)
} }
@ -688,7 +689,7 @@ func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []in
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
if err := c.rdb.Set(ctx, key, s, time.Duration(c.config.MsgCacheTimeout)*time.Second).Err(); err != nil { if err := c.rdb.Set(ctx, key, s, time.Duration(c.msgCacheTimeout)*time.Second).Err(); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
} }

@ -23,7 +23,6 @@ import (
"github.com/golang-jwt/jwt/v4" "github.com/golang-jwt/jwt/v4"
"github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
) )
@ -38,11 +37,10 @@ type authDatabase struct {
cache cache.MsgModel cache cache.MsgModel
accessSecret string accessSecret string
accessExpire int64 accessExpire int64
config *config.GlobalConfig
} }
func NewAuthDatabase(cache cache.MsgModel, accessSecret string, accessExpire int64, config *config.GlobalConfig) AuthDatabase { func NewAuthDatabase(cache cache.MsgModel, accessSecret string, accessExpire int64) AuthDatabase {
return &authDatabase{cache: cache, accessSecret: accessSecret, accessExpire: accessExpire, config: config} return &authDatabase{cache: cache, accessSecret: accessSecret, accessExpire: accessExpire}
} }
// If the result is empty. // If the result is empty.
@ -58,7 +56,7 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
} }
var deleteTokenKey []string var deleteTokenKey []string
for k, v := range tokens { for k, v := range tokens {
_, err = tokenverify.GetClaimFromToken(k, authverify.Secret(a.config.Secret)) _, err = tokenverify.GetClaimFromToken(k, authverify.Secret(a.accessSecret))
if err != nil || v != constant.NormalToken { if err != nil || v != constant.NormalToken {
deleteTokenKey = append(deleteTokenKey, k) deleteTokenKey = append(deleteTokenKey, k)
} }

@ -28,17 +28,18 @@ import (
type ServiceAddresses map[string][]int type ServiceAddresses map[string][]int
func getServiceAddresses(config *config2.GlobalConfig) ServiceAddresses { func getServiceAddresses(rpcRegisterName *config2.RpcRegisterName,
rpcPort *config2.RpcPort, longConnSvrPort []int) ServiceAddresses {
return ServiceAddresses{ return ServiceAddresses{
config.RpcRegisterName.OpenImUserName: config.RpcPort.OpenImUserPort, rpcRegisterName.OpenImUserName: rpcPort.OpenImUserPort,
config.RpcRegisterName.OpenImFriendName: config.RpcPort.OpenImFriendPort, rpcRegisterName.OpenImFriendName: rpcPort.OpenImFriendPort,
config.RpcRegisterName.OpenImMsgName: config.RpcPort.OpenImMessagePort, rpcRegisterName.OpenImMsgName: rpcPort.OpenImMessagePort,
config.RpcRegisterName.OpenImMessageGatewayName: config.LongConnSvr.OpenImMessageGatewayPort, rpcRegisterName.OpenImMessageGatewayName: longConnSvrPort,
config.RpcRegisterName.OpenImGroupName: config.RpcPort.OpenImGroupPort, rpcRegisterName.OpenImGroupName: rpcPort.OpenImGroupPort,
config.RpcRegisterName.OpenImAuthName: config.RpcPort.OpenImAuthPort, rpcRegisterName.OpenImAuthName: rpcPort.OpenImAuthPort,
config.RpcRegisterName.OpenImPushName: config.RpcPort.OpenImPushPort, rpcRegisterName.OpenImPushName: rpcPort.OpenImPushPort,
config.RpcRegisterName.OpenImConversationName: config.RpcPort.OpenImConversationPort, rpcRegisterName.OpenImConversationName: rpcPort.OpenImConversationPort,
config.RpcRegisterName.OpenImThirdName: config.RpcPort.OpenImThirdPort, rpcRegisterName.OpenImThirdName: rpcPort.OpenImThirdPort,
} }
} }
@ -96,7 +97,8 @@ func (cd *ConnDirect) GetConns(ctx context.Context,
if conns, exists := cd.conns[serviceName]; exists { if conns, exists := cd.conns[serviceName]; exists {
return conns, nil return conns, nil
} }
ports := getServiceAddresses(cd.config)[serviceName] ports := getServiceAddresses(&cd.config.RpcRegisterName,
&cd.config.RpcPort, cd.config.LongConnSvr.OpenImMessageGatewayPort)[serviceName]
var connections []*grpc.ClientConn var connections []*grpc.ClientConn
for _, port := range ports { for _, port := range ports {
conn, err := cd.dialServiceWithoutResolver(ctx, fmt.Sprintf(cd.config.Rpc.ListenIP+":%d", port), append(cd.additionalOpts, opts...)...) conn, err := cd.dialServiceWithoutResolver(ctx, fmt.Sprintf(cd.config.Rpc.ListenIP+":%d", port), append(cd.additionalOpts, opts...)...)
@ -114,7 +116,8 @@ func (cd *ConnDirect) GetConns(ctx context.Context,
func (cd *ConnDirect) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { func (cd *ConnDirect) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
// Get service addresses // Get service addresses
addresses := getServiceAddresses(cd.config) addresses := getServiceAddresses(&cd.config.RpcRegisterName,
&cd.config.RpcPort, cd.config.LongConnSvr.OpenImMessageGatewayPort)
address, ok := addresses[serviceName] address, ok := addresses[serviceName]
if !ok { if !ok {
return nil, errs.Wrap(errors.New("unknown service name"), "serviceName", serviceName) return nil, errs.Wrap(errors.New("unknown service name"), "serviceName", serviceName)

@ -36,7 +36,7 @@ func NewDiscoveryRegister(config *config.GlobalConfig) (discoveryregistry.SvcDis
switch config.Envs.Discovery { switch config.Envs.Discovery {
case "zookeeper": case "zookeeper":
return zookeeper.NewZookeeperDiscoveryRegister(config) return zookeeper.NewZookeeperDiscoveryRegister(&config.Zookeeper)
case "k8s": case "k8s":
return kubernetes.NewK8sDiscoveryRegister(config.RpcRegisterName.OpenImMessageGatewayName) return kubernetes.NewK8sDiscoveryRegister(config.RpcRegisterName.OpenImMessageGatewayName)
case "direct": case "direct":

@ -29,11 +29,11 @@ import (
) )
// NewZookeeperDiscoveryRegister creates a new instance of ZookeeperDR for Zookeeper service discovery and registration. // NewZookeeperDiscoveryRegister creates a new instance of ZookeeperDR for Zookeeper service discovery and registration.
func NewZookeeperDiscoveryRegister(config *config.GlobalConfig) (discoveryregistry.SvcDiscoveryRegistry, error) { func NewZookeeperDiscoveryRegister(zkConf *config.Zookeeper) (discoveryregistry.SvcDiscoveryRegistry, error) {
schema := getEnv("ZOOKEEPER_SCHEMA", config.Zookeeper.Schema) schema := getEnv("ZOOKEEPER_SCHEMA", zkConf.Schema)
zkAddr := getZkAddrFromEnv(config.Zookeeper.ZkAddr) zkAddr := getZkAddrFromEnv(zkConf.ZkAddr)
username := getEnv("ZOOKEEPER_USERNAME", config.Zookeeper.Username) username := getEnv("ZOOKEEPER_USERNAME", zkConf.Username)
password := getEnv("ZOOKEEPER_PASSWORD", config.Zookeeper.Password) password := getEnv("ZOOKEEPER_PASSWORD", zkConf.Password)
zk, err := openkeeper.NewClient( zk, err := openkeeper.NewClient(
zkAddr, zkAddr,
@ -47,10 +47,10 @@ func NewZookeeperDiscoveryRegister(config *config.GlobalConfig) (discoveryregist
if err != nil { if err != nil {
uriFormat := "address:%s, username:%s, password:%s, schema:%s." uriFormat := "address:%s, username:%s, password:%s, schema:%s."
errInfo := fmt.Sprintf(uriFormat, errInfo := fmt.Sprintf(uriFormat,
config.Zookeeper.ZkAddr, zkConf.ZkAddr,
config.Zookeeper.Username, zkConf.Username,
config.Zookeeper.Password, zkConf.Password,
config.Zookeeper.Schema) zkConf.Schema)
return nil, errs.Wrap(err, errInfo) return nil, errs.Wrap(err, errInfo)
} }
return zk, nil return zk, nil

@ -21,22 +21,20 @@ import (
"github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/discoveryregistry"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
) )
func NewAuth(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Auth { func NewAuth(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) *Auth {
conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImAuthName) conn, err := discov.GetConn(context.Background(), rpcRegisterName)
if err != nil { if err != nil {
util.ExitWithError(err) util.ExitWithError(err)
} }
client := auth.NewAuthClient(conn) client := auth.NewAuthClient(conn)
return &Auth{discov: discov, conn: conn, Client: client, Config: config} return &Auth{discov: discov, conn: conn, Client: client}
} }
type Auth struct { type Auth struct {
conn grpc.ClientConnInterface conn grpc.ClientConnInterface
Client auth.AuthClient Client auth.AuthClient
discov discoveryregistry.SvcDiscoveryRegistry discov discoveryregistry.SvcDiscoveryRegistry
Config *config.GlobalConfig
} }

@ -34,19 +34,19 @@ type Conversation struct {
Config *config.GlobalConfig Config *config.GlobalConfig
} }
func NewConversation(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Conversation { func NewConversation(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) *Conversation {
conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImConversationName) conn, err := discov.GetConn(context.Background(), rpcRegisterName)
if err != nil { if err != nil {
util.ExitWithError(err) util.ExitWithError(err)
} }
client := pbconversation.NewConversationClient(conn) client := pbconversation.NewConversationClient(conn)
return &Conversation{discov: discov, conn: conn, Client: client, Config: config} return &Conversation{discov: discov, conn: conn, Client: client}
} }
type ConversationRpcClient Conversation type ConversationRpcClient Conversation
func NewConversationRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) ConversationRpcClient { func NewConversationRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) ConversationRpcClient {
return ConversationRpcClient(*NewConversation(discov, config)) return ConversationRpcClient(*NewConversation(discov, rpcRegisterName))
} }
func (c *ConversationRpcClient) GetSingleConversationRecvMsgOpt(ctx context.Context, userID, conversationID string) (int32, error) { func (c *ConversationRpcClient) GetSingleConversationRecvMsgOpt(ctx context.Context, userID, conversationID string) (int32, error) {

@ -22,7 +22,6 @@ import (
"github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/discoveryregistry"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
) )
@ -30,22 +29,21 @@ type Friend struct {
conn grpc.ClientConnInterface conn grpc.ClientConnInterface
Client friend.FriendClient Client friend.FriendClient
discov discoveryregistry.SvcDiscoveryRegistry discov discoveryregistry.SvcDiscoveryRegistry
Config *config.GlobalConfig
} }
func NewFriend(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Friend { func NewFriend(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) *Friend {
conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImFriendName) conn, err := discov.GetConn(context.Background(), rpcRegisterName)
if err != nil { if err != nil {
util.ExitWithError(err) util.ExitWithError(err)
} }
client := friend.NewFriendClient(conn) client := friend.NewFriendClient(conn)
return &Friend{discov: discov, conn: conn, Client: client, Config: config} return &Friend{discov: discov, conn: conn, Client: client}
} }
type FriendRpcClient Friend type FriendRpcClient Friend
func NewFriendRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) FriendRpcClient { func NewFriendRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) FriendRpcClient {
return FriendRpcClient(*NewFriend(discov, config)) return FriendRpcClient(*NewFriend(discov, rpcRegisterName))
} }
func (f *FriendRpcClient) GetFriendsInfo( func (f *FriendRpcClient) GetFriendsInfo(

@ -25,29 +25,27 @@ import (
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/utils" "github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
) )
type Group struct { type Group struct {
Client group.GroupClient Client group.GroupClient
discov discoveryregistry.SvcDiscoveryRegistry discov discoveryregistry.SvcDiscoveryRegistry
Config *config.GlobalConfig
} }
func NewGroup(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Group { func NewGroup(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) *Group {
conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImGroupName) conn, err := discov.GetConn(context.Background(), rpcRegisterName)
if err != nil { if err != nil {
util.ExitWithError(err) util.ExitWithError(err)
} }
client := group.NewGroupClient(conn) client := group.NewGroupClient(conn)
return &Group{discov: discov, Client: client, Config: config} return &Group{discov: discov, Client: client}
} }
type GroupRpcClient Group type GroupRpcClient Group
func NewGroupRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) GroupRpcClient { func NewGroupRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) GroupRpcClient {
return GroupRpcClient(*NewGroup(discov, config)) return GroupRpcClient(*NewGroup(discov, rpcRegisterName))
} }
func (g *GroupRpcClient) GetGroupInfos( func (g *GroupRpcClient) GetGroupInfos(

@ -131,22 +131,21 @@ type Message struct {
conn grpc.ClientConnInterface conn grpc.ClientConnInterface
Client msg.MsgClient Client msg.MsgClient
discov discoveryregistry.SvcDiscoveryRegistry discov discoveryregistry.SvcDiscoveryRegistry
Config *config.GlobalConfig
} }
func NewMessage(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Message { func NewMessage(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) *Message {
conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImMsgName) conn, err := discov.GetConn(context.Background(), rpcRegisterName)
if err != nil { if err != nil {
util.ExitWithError(err) util.ExitWithError(err)
} }
client := msg.NewMsgClient(conn) client := msg.NewMsgClient(conn)
return &Message{discov: discov, conn: conn, Client: client, Config: config} return &Message{discov: discov, conn: conn, Client: client}
} }
type MessageRpcClient Message type MessageRpcClient Message
func NewMessageRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) MessageRpcClient { func NewMessageRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) MessageRpcClient {
return MessageRpcClient(*NewMessage(discov, config)) return MessageRpcClient(*NewMessage(discov, rpcRegisterName))
} }
// SendMsg sends a message through the gRPC client and returns the response. // SendMsg sends a message through the gRPC client and returns the response.

@ -21,7 +21,6 @@ import (
"github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/discoveryregistry"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
) )
@ -31,8 +30,8 @@ type Push struct {
discov discoveryregistry.SvcDiscoveryRegistry discov discoveryregistry.SvcDiscoveryRegistry
} }
func NewPush(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Push { func NewPush(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) *Push {
conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImPushName) conn, err := discov.GetConn(context.Background(), rpcRegisterName)
if err != nil { if err != nil {
util.ExitWithError(err) util.ExitWithError(err)
} }
@ -45,8 +44,8 @@ func NewPush(discov discoveryregistry.SvcDiscoveryRegistry, config *config.Globa
type PushRpcClient Push type PushRpcClient Push
func NewPushRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) PushRpcClient { func NewPushRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) PushRpcClient {
return PushRpcClient(*NewPush(discov, config)) return PushRpcClient(*NewPush(discov, rpcRegisterName))
} }
func (p *PushRpcClient) DelUserPushToken(ctx context.Context, req *push.DelUserPushTokenReq) (*push.DelUserPushTokenResp, error) { func (p *PushRpcClient) DelUserPushToken(ctx context.Context, req *push.DelUserPushTokenReq) (*push.DelUserPushTokenResp, error) {

@ -16,16 +16,10 @@ package rpcclient
import ( import (
"context" "context"
"net/url"
"github.com/OpenIMSDK/protocol/third" "github.com/OpenIMSDK/protocol/third"
"github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/errs"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
) )
@ -33,42 +27,17 @@ type Third struct {
conn grpc.ClientConnInterface conn grpc.ClientConnInterface
Client third.ThirdClient Client third.ThirdClient
discov discoveryregistry.SvcDiscoveryRegistry discov discoveryregistry.SvcDiscoveryRegistry
MinioClient *minio.Client GrafanaUrl string
Config *config.GlobalConfig
} }
func NewThird(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Third { func NewThird(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName, grafanaUrl string) *Third {
conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImThirdName) conn, err := discov.GetConn(context.Background(), rpcRegisterName)
if err != nil { if err != nil {
util.ExitWithError(err) util.ExitWithError(err)
} }
client := third.NewThirdClient(conn) client := third.NewThirdClient(conn)
minioClient, err := minioInit(config)
if err != nil { if err != nil {
util.ExitWithError(err) util.ExitWithError(err)
} }
return &Third{discov: discov, Client: client, conn: conn, MinioClient: minioClient, Config: config} return &Third{discov: discov, Client: client, conn: conn, GrafanaUrl: grafanaUrl}
}
func minioInit(config *config.GlobalConfig) (*minio.Client, error) {
minioClient := &minio.Client{}
initUrl := config.Object.Minio.Endpoint
minioUrl, err := url.Parse(initUrl)
if err != nil {
return nil, errs.Wrap(err, "minioInit: failed to parse MinIO endpoint URL")
}
opts := &minio.Options{
Creds: credentials.NewStaticV4(config.Object.Minio.AccessKeyID, config.Object.Minio.SecretAccessKey, ""),
// Region: config.Credential.Minio.Location,
}
if minioUrl.Scheme == "http" {
opts.Secure = false
} else if minioUrl.Scheme == "https" {
opts.Secure = true
}
minioClient, err = minio.New(minioUrl.Host, opts)
if err != nil {
return nil, errs.Wrap(err, "minioInit: failed to create MinIO client")
}
return minioClient, nil
} }

@ -25,8 +25,6 @@ import (
"github.com/OpenIMSDK/tools/utils" "github.com/OpenIMSDK/tools/utils"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
) )
@ -35,17 +33,17 @@ type User struct {
conn grpc.ClientConnInterface conn grpc.ClientConnInterface
Client user.UserClient Client user.UserClient
Discov discoveryregistry.SvcDiscoveryRegistry Discov discoveryregistry.SvcDiscoveryRegistry
Config *config.GlobalConfig MessageGateWayRpcName string
} }
// NewUser initializes and returns a User instance based on the provided service discovery registry. // NewUser initializes and returns a User instance based on the provided service discovery registry.
func NewUser(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *User { func NewUser(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName, messageGateWayRpcName string) *User {
conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImUserName) conn, err := discov.GetConn(context.Background(), rpcRegisterName)
if err != nil { if err != nil {
util.ExitWithError(err) util.ExitWithError(err)
} }
client := user.NewUserClient(conn) client := user.NewUserClient(conn)
return &User{Discov: discov, Client: client, conn: conn, Config: config} return &User{Discov: discov, Client: client, conn: conn, MessageGateWayRpcName: messageGateWayRpcName}
} }
// UserRpcClient represents the structure for a User RPC client. // UserRpcClient represents the structure for a User RPC client.
@ -58,8 +56,8 @@ func NewUserRpcClientByUser(user *User) *UserRpcClient {
} }
// NewUserRpcClient initializes a UserRpcClient based on the provided service discovery registry. // NewUserRpcClient initializes a UserRpcClient based on the provided service discovery registry.
func NewUserRpcClient(client discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) UserRpcClient { func NewUserRpcClient(client discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) UserRpcClient {
return UserRpcClient(*NewUser(client, config)) return UserRpcClient(*NewUser(client, rpcRegisterName, ""))
} }
// GetUsersInfo retrieves information for multiple users based on their user IDs. // GetUsersInfo retrieves information for multiple users based on their user IDs.
@ -156,15 +154,6 @@ func (u *UserRpcClient) GetUserGlobalMsgRecvOpt(ctx context.Context, userID stri
return resp.GlobalRecvMsgOpt, nil return resp.GlobalRecvMsgOpt, nil
} }
// Access verifies the access rights for the provided user ID.
func (u *UserRpcClient) Access(ctx context.Context, ownerUserID string) error {
_, err := u.GetUserInfo(ctx, ownerUserID)
if err != nil {
return err
}
return authverify.CheckAccessV3(ctx, ownerUserID, u.Config)
}
// GetAllUserIDs retrieves all user IDs with pagination options. // GetAllUserIDs retrieves all user IDs with pagination options.
func (u *UserRpcClient) GetAllUserIDs(ctx context.Context, pageNumber, showNumber int32) ([]string, error) { func (u *UserRpcClient) GetAllUserIDs(ctx context.Context, pageNumber, showNumber int32) ([]string, error) {
resp, err := u.Client.GetAllUserID(ctx, &user.GetAllUserIDReq{Pagination: &sdkws.RequestPagination{PageNumber: pageNumber, ShowNumber: showNumber}}) resp, err := u.Client.GetAllUserID(ctx, &user.GetAllUserIDReq{Pagination: &sdkws.RequestPagination{PageNumber: pageNumber, ShowNumber: showNumber}})

Loading…
Cancel
Save