fix: redis mongo

pull/2148/head
withchao 2 years ago
parent a8c0250944
commit 7ee9e4cdde

@ -16,6 +16,7 @@ package auth
import ( import (
"context" "context"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -43,7 +44,7 @@ type authServer struct {
} }
func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(ctx, &config.Redis) rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build())
if err != nil { if err != nil {
return err return err
} }

@ -17,6 +17,7 @@ package conversation
import ( import (
"context" "context"
"errors" "errors"
"github.com/openimsdk/tools/db/redisutil"
"sort" "sort"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -25,7 +26,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
@ -49,15 +49,15 @@ type conversationServer struct {
} }
func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(ctx, &config.Redis) mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build())
if err != nil { if err != nil {
return err return err
} }
mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo) rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build())
if err != nil { if err != nil {
return err return err
} }
conversationDB, err := mgo.NewConversationMongo(mongo.GetDatabase(config.Mongo.Database)) conversationDB, err := mgo.NewConversationMongo(mgocli.GetDB())
if err != nil { if err != nil {
return err return err
} }
@ -69,7 +69,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv
user: &userRpcClient, user: &userRpcClient,
conversationNotificationSender: notification.NewConversationNotificationSender(&config.Notification, &msgRpcClient), conversationNotificationSender: notification.NewConversationNotificationSender(&config.Notification, &msgRpcClient),
groupRpcClient: &groupRpcClient, groupRpcClient: &groupRpcClient,
conversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), mongoutil.NewMongo(mongo.GetClient())), conversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), mgocli.GetTx()),
}) })
return nil return nil
} }

@ -16,6 +16,7 @@ package friend
import ( import (
"context" "context"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -24,7 +25,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
@ -49,29 +49,26 @@ type friendServer struct {
} }
func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
// Initialize MongoDB mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build())
mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo)
if err != nil { if err != nil {
return err return err
} }
rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build())
// Initialize Redis
rdb, err := cache.NewRedis(ctx, &config.Redis)
if err != nil { if err != nil {
return err return err
} }
friendMongoDB, err := mgo.NewFriendMongo(mongo.GetDatabase(config.Mongo.Database)) friendMongoDB, err := mgo.NewFriendMongo(mgocli.GetDB())
if err != nil { if err != nil {
return err return err
} }
friendRequestMongoDB, err := mgo.NewFriendRequestMongo(mongo.GetDatabase(config.Mongo.Database)) friendRequestMongoDB, err := mgo.NewFriendRequestMongo(mgocli.GetDB())
if err != nil { if err != nil {
return err return err
} }
blackMongoDB, err := mgo.NewBlackMongo(mongo.GetDatabase(config.Mongo.Database)) blackMongoDB, err := mgo.NewBlackMongo(mgocli.GetDB())
if err != nil { if err != nil {
return err return err
} }
@ -93,7 +90,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv
friendMongoDB, friendMongoDB,
friendRequestMongoDB, friendRequestMongoDB,
cache.NewFriendCacheRedis(rdb, friendMongoDB, cache.GetDefaultOpt()), cache.NewFriendCacheRedis(rdb, friendMongoDB, cache.GetDefaultOpt()),
mongoutil.NewMongo(mongo.GetClient()), mgocli.GetTx(),
), ),
blackDatabase: controller.NewBlackDatabase( blackDatabase: controller.NewBlackDatabase(
blackMongoDB, blackMongoDB,

@ -17,6 +17,7 @@ package tools
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/openimsdk/tools/db/redisutil"
"math" "math"
"math/rand" "math/rand"
@ -24,7 +25,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
@ -65,11 +65,11 @@ func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controlle
} }
func InitMsgTool(ctx context.Context, config *config.GlobalConfig) (*MsgTool, error) { func InitMsgTool(ctx context.Context, config *config.GlobalConfig) (*MsgTool, error) {
rdb, err := cache.NewRedis(ctx, &config.Redis) mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build())
if err != nil { if err != nil {
return nil, err return nil, err
} }
mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo) rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -78,43 +78,42 @@ func InitMsgTool(ctx context.Context, config *config.GlobalConfig) (*MsgTool, er
return nil, err return nil, err
} }
discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
userDB, err := mgo.NewUserMongo(mongo.GetDatabase(config.Mongo.Database)) userDB, err := mgo.NewUserMongo(mgocli.GetDB())
if err != nil { if err != nil {
return nil, err return nil, err
} }
msgDatabase, err := controller.InitCommonMsgDatabase(rdb, mongo.GetDatabase(config.Mongo.Database), config) msgDatabase, err := controller.InitCommonMsgDatabase(rdb, mgocli.GetDB(), config)
if err != nil { if err != nil {
return nil, err return nil, err
} }
userMongoDB := mgo.NewUserMongoDriver(mongo.GetDatabase(config.Mongo.Database)) userMongoDB := mgo.NewUserMongoDriver(mgocli.GetDB())
ctxTx := mongoutil.NewMongo(mongo.GetClient())
userDatabase := controller.NewUserDatabase( userDatabase := controller.NewUserDatabase(
userDB, userDB,
cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt()), cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt()),
ctxTx, mgocli.GetTx(),
userMongoDB, userMongoDB,
) )
groupDB, err := mgo.NewGroupMongo(mongo.GetDatabase(config.Mongo.Database)) groupDB, err := mgo.NewGroupMongo(mgocli.GetDB())
if err != nil { if err != nil {
return nil, err return nil, err
} }
groupMemberDB, err := mgo.NewGroupMember(mongo.GetDatabase(config.Mongo.Database)) groupMemberDB, err := mgo.NewGroupMember(mgocli.GetDB())
if err != nil { if err != nil {
return nil, err return nil, err
} }
groupRequestDB, err := mgo.NewGroupRequestMgo(mongo.GetDatabase(config.Mongo.Database)) groupRequestDB, err := mgo.NewGroupRequestMgo(mgocli.GetDB())
if err != nil { if err != nil {
return nil, err return nil, err
} }
conversationDB, err := mgo.NewConversationMongo(mongo.GetDatabase(config.Mongo.Database)) conversationDB, err := mgo.NewConversationMongo(mgocli.GetDB())
if err != nil { if err != nil {
return nil, err return nil, err
} }
groupDatabase := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, ctxTx, nil) groupDatabase := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), nil)
conversationDatabase := controller.NewConversationDatabase( conversationDatabase := controller.NewConversationDatabase(
conversationDB, conversationDB,
cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB),
ctxTx, mgocli.GetTx(),
) )
msgRpcClient := rpcclient.NewMessageRpcClient(discov, config.RpcRegisterName.OpenImMsgName) msgRpcClient := rpcclient.NewMessageRpcClient(discov, config.RpcRegisterName.OpenImMsgName)
msgNotificationSender := notification.NewMsgNotificationSender(config, rpcclient.WithRpcClient(&msgRpcClient)) msgNotificationSender := notification.NewMsgNotificationSender(config, rpcclient.WithRpcClient(&msgRpcClient))

@ -1,109 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cache
import (
"context"
"errors"
"fmt"
"os"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mw/specialerror"
"github.com/redis/go-redis/v9"
)
var (
// Singleton pattern.
redisClient redis.UniversalClient
)
const (
maxRetry = 10 // number of retries
)
// NewRedis Initialize redis connection.
func NewRedis(ctx context.Context, redisConf *config.Redis) (redis.UniversalClient, error) {
if redisClient != nil {
return redisClient, nil
}
// Read configuration from environment variables
overrideConfigFromEnv(redisConf)
if len(redisConf.Address) == 0 {
return nil, errs.Wrap(errors.New("redis address is empty"))
}
specialerror.AddReplace(redis.Nil, errs.ErrRecordNotFound)
var rdb redis.UniversalClient
if len(redisConf.Address) > 1 || redisConf.ClusterMode {
rdb = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: redisConf.Address,
Username: redisConf.Username,
Password: redisConf.Password, // no password set
PoolSize: 50,
MaxRetries: maxRetry,
})
} else {
rdb = redis.NewClient(&redis.Options{
Addr: redisConf.Address[0],
Username: redisConf.Username,
Password: redisConf.Password,
DB: 0, // use default DB
PoolSize: 100, // connection pool size
MaxRetries: maxRetry,
})
}
var err error
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
err = rdb.Ping(ctx).Err()
if err != nil {
errMsg := fmt.Sprintf("address:%s, username:%s, password:%s, clusterMode:%t, enablePipeline:%t", redisConf.Address, redisConf.Username,
redisConf.Password, redisConf.ClusterMode, redisConf.EnablePipeline)
return nil, errs.WrapMsg(err, errMsg)
}
redisClient = rdb
log.CInfo(ctx, "REDIS connected successfully", "address", redisConf.Address, "username", redisConf.Username, "password", redisConf.Password, "clusterMode", redisConf.ClusterMode, "enablePipeline", redisConf.EnablePipeline)
return rdb, err
}
// overrideConfigFromEnv overrides configuration fields with environment variables if present.
func overrideConfigFromEnv(redis *config.Redis) {
if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" {
if envPort := os.Getenv("REDIS_PORT"); envPort != "" {
addresses := strings.Split(envAddr, ",")
for i, addr := range addresses {
addresses[i] = addr + ":" + envPort
}
redis.Address = addresses
} else {
redis.Address = strings.Split(envAddr, ",")
}
}
if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" {
redis.Username = envUser
}
if envPass := os.Getenv("REDIS_PASSWORD"); envPass != "" {
redis.Password = envPass
}
}

@ -1 +0,0 @@
package unrelation // import "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"

@ -1,127 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package unrelation
import (
"context"
"fmt"
"os"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mw/specialerror"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
const (
maxRetry = 10 // number of retries
mongoConnTimeout = 10 * time.Second
)
type Mongo struct {
db *mongo.Client
mongoConf *config.Mongo
}
// NewMongoDB Initialize MongoDB connection.
func NewMongoDB(ctx context.Context, mongoConf *config.Mongo) (*Mongo, error) {
specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound)
uri := buildMongoURI(mongoConf)
var mongoClient *mongo.Client
var err error
// Retry connecting to MongoDB
for i := 0; i <= maxRetry; i++ {
ctx, cancel := context.WithTimeout(ctx, mongoConnTimeout)
defer cancel()
mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(uri))
if err == nil {
if err = mongoClient.Ping(ctx, nil); err != nil {
return nil, errs.WrapMsg(err, uri)
}
log.CInfo(ctx, "MONGODB connected successfully", "uri", uri)
return &Mongo{db: mongoClient, mongoConf: mongoConf}, nil
}
if shouldRetry(err) {
time.Sleep(time.Second) // exponential backoff could be implemented here
continue
}
}
return nil, errs.WrapMsg(err, uri)
}
func buildMongoURI(mongoConf *config.Mongo) string {
uri := os.Getenv("MONGO_URI")
if uri != "" {
return uri
}
if mongoConf.Uri != "" {
return mongoConf.Uri
}
username := os.Getenv("MONGO_OPENIM_USERNAME")
password := os.Getenv("MONGO_OPENIM_PASSWORD")
address := os.Getenv("MONGO_ADDRESS")
port := os.Getenv("MONGO_PORT")
database := os.Getenv("MONGO_DATABASE")
maxPoolSize := os.Getenv("MONGO_MAX_POOL_SIZE")
if username == "" {
username = mongoConf.Username
}
if password == "" {
password = mongoConf.Password
}
if address == "" {
address = strings.Join(mongoConf.Address, ",")
} else if port != "" {
address = fmt.Sprintf("%s:%s", address, port)
}
if database == "" {
database = mongoConf.Database
}
if maxPoolSize == "" {
maxPoolSize = fmt.Sprint(mongoConf.MaxPoolSize)
}
if username != "" && password != "" {
return fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%s", username, password, address, database, maxPoolSize)
}
return fmt.Sprintf("mongodb://%s/%s?maxPoolSize=%s", address, database, maxPoolSize)
}
func shouldRetry(err error) bool {
if cmdErr, ok := err.(mongo.CommandError); ok {
return cmdErr.Code != 13 && cmdErr.Code != 18
}
return true
}
// GetClient returns the MongoDB client.
func (m *Mongo) GetClient() *mongo.Client {
return m.db
}
// GetDatabase returns the specific database from MongoDB.
func (m *Mongo) GetDatabase(database string) *mongo.Database {
return m.db.Database(database)
}
Loading…
Cancel
Save