diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 57dad178a..b78cb9b1e 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -16,6 +16,7 @@ package auth import ( "context" + "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -43,7 +44,7 @@ type authServer struct { } func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis(ctx, &config.Redis) + rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build()) if err != nil { return err } diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index e3dd6ee24..3ef0236af 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -17,6 +17,7 @@ package conversation import ( "context" "errors" + "github.com/openimsdk/tools/db/redisutil" "sort" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -25,7 +26,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" @@ -49,15 +49,15 @@ type conversationServer struct { } func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis(ctx, &config.Redis) + mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build()) if err != nil { return err } - mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo) + rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build()) if err != nil { return err } - conversationDB, err := mgo.NewConversationMongo(mongo.GetDatabase(config.Mongo.Database)) + conversationDB, err := mgo.NewConversationMongo(mgocli.GetDB()) if err != nil { return err } @@ -69,7 +69,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv user: &userRpcClient, conversationNotificationSender: notification.NewConversationNotificationSender(&config.Notification, &msgRpcClient), groupRpcClient: &groupRpcClient, - conversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), mongoutil.NewMongo(mongo.GetClient())), + conversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), mgocli.GetTx()), }) return nil } diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 16848e16c..32e05c1d1 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -16,6 +16,7 @@ package friend import ( "context" + "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -24,7 +25,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" @@ -49,29 +49,26 @@ type friendServer struct { } func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { - // Initialize MongoDB - mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo) + mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build()) if err != nil { return err } - - // Initialize Redis - rdb, err := cache.NewRedis(ctx, &config.Redis) + rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build()) if err != nil { return err } - friendMongoDB, err := mgo.NewFriendMongo(mongo.GetDatabase(config.Mongo.Database)) + friendMongoDB, err := mgo.NewFriendMongo(mgocli.GetDB()) if err != nil { return err } - friendRequestMongoDB, err := mgo.NewFriendRequestMongo(mongo.GetDatabase(config.Mongo.Database)) + friendRequestMongoDB, err := mgo.NewFriendRequestMongo(mgocli.GetDB()) if err != nil { return err } - blackMongoDB, err := mgo.NewBlackMongo(mongo.GetDatabase(config.Mongo.Database)) + blackMongoDB, err := mgo.NewBlackMongo(mgocli.GetDB()) if err != nil { return err } @@ -93,7 +90,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv friendMongoDB, friendRequestMongoDB, cache.NewFriendCacheRedis(rdb, friendMongoDB, cache.GetDefaultOpt()), - mongoutil.NewMongo(mongo.GetClient()), + mgocli.GetTx(), ), blackDatabase: controller.NewBlackDatabase( blackMongoDB, diff --git a/internal/tools/msg.go b/internal/tools/msg.go index aa1e2e3cb..af9940754 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -17,6 +17,7 @@ package tools import ( "context" "fmt" + "github.com/openimsdk/tools/db/redisutil" "math" "math/rand" @@ -24,7 +25,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" @@ -65,11 +65,11 @@ func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controlle } func InitMsgTool(ctx context.Context, config *config.GlobalConfig) (*MsgTool, error) { - rdb, err := cache.NewRedis(ctx, &config.Redis) + mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build()) if err != nil { return nil, err } - mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo) + rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build()) if err != nil { return nil, err } @@ -78,43 +78,42 @@ func InitMsgTool(ctx context.Context, config *config.GlobalConfig) (*MsgTool, er return nil, err } discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) - userDB, err := mgo.NewUserMongo(mongo.GetDatabase(config.Mongo.Database)) + userDB, err := mgo.NewUserMongo(mgocli.GetDB()) if err != nil { return nil, err } - msgDatabase, err := controller.InitCommonMsgDatabase(rdb, mongo.GetDatabase(config.Mongo.Database), config) + msgDatabase, err := controller.InitCommonMsgDatabase(rdb, mgocli.GetDB(), config) if err != nil { return nil, err } - userMongoDB := mgo.NewUserMongoDriver(mongo.GetDatabase(config.Mongo.Database)) - ctxTx := mongoutil.NewMongo(mongo.GetClient()) + userMongoDB := mgo.NewUserMongoDriver(mgocli.GetDB()) userDatabase := controller.NewUserDatabase( userDB, cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt()), - ctxTx, + mgocli.GetTx(), userMongoDB, ) - groupDB, err := mgo.NewGroupMongo(mongo.GetDatabase(config.Mongo.Database)) + groupDB, err := mgo.NewGroupMongo(mgocli.GetDB()) if err != nil { return nil, err } - groupMemberDB, err := mgo.NewGroupMember(mongo.GetDatabase(config.Mongo.Database)) + groupMemberDB, err := mgo.NewGroupMember(mgocli.GetDB()) if err != nil { return nil, err } - groupRequestDB, err := mgo.NewGroupRequestMgo(mongo.GetDatabase(config.Mongo.Database)) + groupRequestDB, err := mgo.NewGroupRequestMgo(mgocli.GetDB()) if err != nil { return nil, err } - conversationDB, err := mgo.NewConversationMongo(mongo.GetDatabase(config.Mongo.Database)) + conversationDB, err := mgo.NewConversationMongo(mgocli.GetDB()) if err != nil { return nil, err } - groupDatabase := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, ctxTx, nil) + groupDatabase := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), nil) conversationDatabase := controller.NewConversationDatabase( conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), - ctxTx, + mgocli.GetTx(), ) msgRpcClient := rpcclient.NewMessageRpcClient(discov, config.RpcRegisterName.OpenImMsgName) msgNotificationSender := notification.NewMsgNotificationSender(config, rpcclient.WithRpcClient(&msgRpcClient)) diff --git a/pkg/common/db/cache/init_redis.go b/pkg/common/db/cache/init_redis.go deleted file mode 100644 index 35a83cf9b..000000000 --- a/pkg/common/db/cache/init_redis.go +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cache - -import ( - "context" - "errors" - "fmt" - "os" - "strings" - "time" - - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mw/specialerror" - "github.com/redis/go-redis/v9" -) - -var ( - // Singleton pattern. - redisClient redis.UniversalClient -) - -const ( - maxRetry = 10 // number of retries -) - -// NewRedis Initialize redis connection. -func NewRedis(ctx context.Context, redisConf *config.Redis) (redis.UniversalClient, error) { - if redisClient != nil { - return redisClient, nil - } - - // Read configuration from environment variables - overrideConfigFromEnv(redisConf) - - if len(redisConf.Address) == 0 { - return nil, errs.Wrap(errors.New("redis address is empty")) - } - specialerror.AddReplace(redis.Nil, errs.ErrRecordNotFound) - var rdb redis.UniversalClient - if len(redisConf.Address) > 1 || redisConf.ClusterMode { - rdb = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: redisConf.Address, - Username: redisConf.Username, - Password: redisConf.Password, // no password set - PoolSize: 50, - MaxRetries: maxRetry, - }) - } else { - rdb = redis.NewClient(&redis.Options{ - Addr: redisConf.Address[0], - Username: redisConf.Username, - Password: redisConf.Password, - DB: 0, // use default DB - PoolSize: 100, // connection pool size - MaxRetries: maxRetry, - }) - } - - var err error - ctx, cancel := context.WithTimeout(ctx, time.Second*10) - defer cancel() - err = rdb.Ping(ctx).Err() - if err != nil { - errMsg := fmt.Sprintf("address:%s, username:%s, password:%s, clusterMode:%t, enablePipeline:%t", redisConf.Address, redisConf.Username, - redisConf.Password, redisConf.ClusterMode, redisConf.EnablePipeline) - return nil, errs.WrapMsg(err, errMsg) - } - redisClient = rdb - log.CInfo(ctx, "REDIS connected successfully", "address", redisConf.Address, "username", redisConf.Username, "password", redisConf.Password, "clusterMode", redisConf.ClusterMode, "enablePipeline", redisConf.EnablePipeline) - return rdb, err -} - -// overrideConfigFromEnv overrides configuration fields with environment variables if present. -func overrideConfigFromEnv(redis *config.Redis) { - if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" { - if envPort := os.Getenv("REDIS_PORT"); envPort != "" { - addresses := strings.Split(envAddr, ",") - for i, addr := range addresses { - addresses[i] = addr + ":" + envPort - } - redis.Address = addresses - } else { - redis.Address = strings.Split(envAddr, ",") - } - } - - if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" { - redis.Username = envUser - } - - if envPass := os.Getenv("REDIS_PASSWORD"); envPass != "" { - redis.Password = envPass - } -} diff --git a/pkg/common/db/unrelation/doc.go b/pkg/common/db/unrelation/doc.go deleted file mode 100644 index a6a696306..000000000 --- a/pkg/common/db/unrelation/doc.go +++ /dev/null @@ -1 +0,0 @@ -package unrelation // import "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go deleted file mode 100644 index 4d6653a53..000000000 --- a/pkg/common/db/unrelation/mongo.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package unrelation - -import ( - "context" - "fmt" - "os" - "strings" - "time" - - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mw/specialerror" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" -) - -const ( - maxRetry = 10 // number of retries - mongoConnTimeout = 10 * time.Second -) - -type Mongo struct { - db *mongo.Client - mongoConf *config.Mongo -} - -// NewMongoDB Initialize MongoDB connection. -func NewMongoDB(ctx context.Context, mongoConf *config.Mongo) (*Mongo, error) { - specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound) - uri := buildMongoURI(mongoConf) - - var mongoClient *mongo.Client - var err error - - // Retry connecting to MongoDB - for i := 0; i <= maxRetry; i++ { - ctx, cancel := context.WithTimeout(ctx, mongoConnTimeout) - defer cancel() - mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(uri)) - if err == nil { - if err = mongoClient.Ping(ctx, nil); err != nil { - return nil, errs.WrapMsg(err, uri) - } - log.CInfo(ctx, "MONGODB connected successfully", "uri", uri) - return &Mongo{db: mongoClient, mongoConf: mongoConf}, nil - } - if shouldRetry(err) { - time.Sleep(time.Second) // exponential backoff could be implemented here - continue - } - } - return nil, errs.WrapMsg(err, uri) -} - -func buildMongoURI(mongoConf *config.Mongo) string { - uri := os.Getenv("MONGO_URI") - if uri != "" { - return uri - } - - if mongoConf.Uri != "" { - return mongoConf.Uri - } - - username := os.Getenv("MONGO_OPENIM_USERNAME") - password := os.Getenv("MONGO_OPENIM_PASSWORD") - address := os.Getenv("MONGO_ADDRESS") - port := os.Getenv("MONGO_PORT") - database := os.Getenv("MONGO_DATABASE") - maxPoolSize := os.Getenv("MONGO_MAX_POOL_SIZE") - - if username == "" { - username = mongoConf.Username - } - if password == "" { - password = mongoConf.Password - } - if address == "" { - address = strings.Join(mongoConf.Address, ",") - } else if port != "" { - address = fmt.Sprintf("%s:%s", address, port) - } - if database == "" { - database = mongoConf.Database - } - if maxPoolSize == "" { - maxPoolSize = fmt.Sprint(mongoConf.MaxPoolSize) - } - - if username != "" && password != "" { - - return fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%s", username, password, address, database, maxPoolSize) - } - return fmt.Sprintf("mongodb://%s/%s?maxPoolSize=%s", address, database, maxPoolSize) -} - -func shouldRetry(err error) bool { - if cmdErr, ok := err.(mongo.CommandError); ok { - return cmdErr.Code != 13 && cmdErr.Code != 18 - } - return true -} - -// GetClient returns the MongoDB client. -func (m *Mongo) GetClient() *mongo.Client { - return m.db -} - -// GetDatabase returns the specific database from MongoDB. -func (m *Mongo) GetDatabase(database string) *mongo.Database { - return m.db.Database(database) -}