From 35b1f75036c9939a894e6187c73ac2190ca07778 Mon Sep 17 00:00:00 2001 From: plutoyty <2631223275@qq.com> Date: Wed, 12 Jul 2023 19:15:39 +0800 Subject: [PATCH] Add retry mechanism to mongoDB, Redis, Kafka --- .gitignore | 1 + config/config.yaml | 2 +- pkg/common/db/cache/init_redis.go | 31 ++++++++++++------- pkg/common/db/cache/init_redis_test.go | 30 +++++++++++++++++++ pkg/common/db/unrelation/extend_msg.go | 6 ++-- pkg/common/db/unrelation/mongo.go | 41 +++++++++++++++++--------- pkg/common/http/http_client.go | 21 +++++++++---- pkg/common/kafka/producer.go | 27 ++++++++++++++--- pkg/common/mw/gin.go | 9 +++--- 9 files changed, 125 insertions(+), 43 deletions(-) create mode 100644 pkg/common/db/cache/init_redis_test.go diff --git a/.gitignore b/.gitignore index 6e29bad2c..cabe1a427 100644 --- a/.gitignore +++ b/.gitignore @@ -389,3 +389,4 @@ Sessionx.vim [._]*.un~ # End of https://www.toptal.com/developers/gitignore/api/go,git,vim,tags,test,emacs,backup,jetbrains +.idea \ No newline at end of file diff --git a/config/config.yaml b/config/config.yaml index ea6f8772d..4ff0e44b7 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -36,7 +36,7 @@ mysql: mongo: uri: #不为空则直接使用该值 address: [ 127.0.0.1:37017 ] #单机时为mongo地址,使用分片集群时,为mongos地址 - database: openIM_v3 #mongo db 默认即可 + database: openIM_v3 #mongo db 默认即可 username: root #用户名 password: openIM123 #密码 maxPoolSize: 100 diff --git a/pkg/common/db/cache/init_redis.go b/pkg/common/db/cache/init_redis.go index 72dfc8caf..be0431adf 100644 --- a/pkg/common/db/cache/init_redis.go +++ b/pkg/common/db/cache/init_redis.go @@ -27,6 +27,11 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" ) +const ( + maxRetry = 10 //number of retries +) + +// NewRedis Initialize redis connection func NewRedis() (redis.UniversalClient, error) { if len(config.Config.Redis.Address) == 0 { return nil, errors.New("redis address is empty") @@ -35,25 +40,29 @@ func NewRedis() (redis.UniversalClient, error) { var rdb redis.UniversalClient if len(config.Config.Redis.Address) > 1 { rdb = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: config.Config.Redis.Address, - Username: config.Config.Redis.Username, - Password: config.Config.Redis.Password, // no password set - PoolSize: 50, + Addrs: config.Config.Redis.Address, + Username: config.Config.Redis.Username, + Password: config.Config.Redis.Password, // no password set + PoolSize: 50, + MaxRetries: maxRetry, }) } else { rdb = redis.NewClient(&redis.Options{ - Addr: config.Config.Redis.Address[0], - Username: config.Config.Redis.Username, - Password: config.Config.Redis.Password, // no password set - DB: 0, // use default DB - PoolSize: 100, // 连接池大小 + Addr: config.Config.Redis.Address[0], + Username: config.Config.Redis.Username, + Password: config.Config.Redis.Password, // no password set + DB: 0, // use default DB + PoolSize: 100, // connection pool size + MaxRetries: maxRetry, }) } + + var err error = nil ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - err := rdb.Ping(ctx).Err() + err = rdb.Ping(ctx).Err() if err != nil { return nil, fmt.Errorf("redis ping %w", err) } - return rdb, nil + return rdb, err } diff --git a/pkg/common/db/cache/init_redis_test.go b/pkg/common/db/cache/init_redis_test.go new file mode 100644 index 000000000..6f78a43bd --- /dev/null +++ b/pkg/common/db/cache/init_redis_test.go @@ -0,0 +1,30 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" + "testing" +) + +//TestNewRedis Test redis connection +func TestNewRedis(t *testing.T) { + err := config.InitConfig("config_folder_path") + if err != nil { + fmt.Println("config load error") + return + } +} diff --git a/pkg/common/db/unrelation/extend_msg.go b/pkg/common/db/unrelation/extend_msg.go index 17e0b2e19..77f65cbd2 100644 --- a/pkg/common/db/unrelation/extend_msg.go +++ b/pkg/common/db/unrelation/extend_msg.go @@ -100,7 +100,7 @@ func (e *ExtendMsgSetMongoDriver) GetExtendMsgSet( return &setList[0], nil } -// first modify msg +// InsertExtendMsg first modify msg. func (e *ExtendMsgSetMongoDriver) InsertExtendMsg( ctx context.Context, conversationID string, @@ -130,7 +130,7 @@ func (e *ExtendMsgSetMongoDriver) InsertExtendMsg( return utils.Wrap(err, "") } -// insert or update +// InsertOrUpdateReactionExtendMsgSet insert or update. func (e *ExtendMsgSetMongoDriver) InsertOrUpdateReactionExtendMsgSet( ctx context.Context, conversationID string, @@ -163,7 +163,7 @@ func (e *ExtendMsgSetMongoDriver) InsertOrUpdateReactionExtendMsgSet( return utils.Wrap(err, "") } -// delete TypeKey +// DeleteReactionExtendMsgSet delete TypeKey. func (e *ExtendMsgSetMongoDriver) DeleteReactionExtendMsgSet( ctx context.Context, conversationID string, diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go index 51b9e4b7e..911edef3c 100644 --- a/pkg/common/db/unrelation/mongo.go +++ b/pkg/common/db/unrelation/mongo.go @@ -31,19 +31,21 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" ) +const ( + maxRetry = 10 //number of retries +) + type Mongo struct { db *mongo.Client } +// NewMongo Initialize MongoDB connection func NewMongo() (*Mongo, error) { specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound) - uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority" + url := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority" if config.Config.Mongo.Uri != "" { - // example: - // mongodb://$user:$password@mongo1.mongo:27017,mongo2.mongo:27017,mongo3.mongo:27017/$DBDatabase/?replicaSet=rs0&readPreference=secondary&authSource=admin&maxPoolSize=$DBMaxPoolSize - uri = config.Config.Mongo.Uri + url = config.Config.Mongo.Uri } else { - //mongodb://mongodb1.example.com:27317,mongodb2.example.com:27017/?replicaSet=mySet&authSource=authDB mongodbHosts := "" for i, v := range config.Config.Mongo.Address { if i == len(config.Config.Mongo.Address)-1 { @@ -53,23 +55,34 @@ func NewMongo() (*Mongo, error) { } } if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" { - uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin", + url = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin", config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts, config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize) } else { - uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin", + url = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin", mongodbHosts, config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize) } } - fmt.Println("mongo:", uri) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) - defer cancel() - mongoClient, err := mongo.Connect(ctx, options.Client().ApplyURI(uri)) - if err != nil { - return nil, err + fmt.Println("mongo:", url) + var mongoClient *mongo.Client + var err error = nil + for i := 0; i <= maxRetry; i++ { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(url)) + if err == nil { + return &Mongo{db: mongoClient}, nil + } + if cmdErr, ok := err.(mongo.CommandError); ok { + if cmdErr.Code == 13 || cmdErr.Code == 18 { + return nil, err + } else { + fmt.Printf("Failed to connect to MongoDB: %s\n", err) + } + } } - return &Mongo{db: mongoClient}, nil + return nil, err } func (m *Mongo) GetClient() *mongo.Client { diff --git a/pkg/common/http/http_client.go b/pkg/common/http/http_client.go index 72e3fae62..153deb30e 100644 --- a/pkg/common/http/http_client.go +++ b/pkg/common/http/http_client.go @@ -1,9 +1,18 @@ -/* -** description(""). -** copyright('open-im,www.open-im.io'). -** author("fg,Gordon@tuoyun.net"). -** time(2021/5/27 10:31). - */package http +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package http import ( "bytes" diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 4c4ebc460..a66ef3dba 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -17,12 +17,12 @@ package kafka import ( "context" "errors" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" log "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" + "time" "github.com/Shopify/sarama" "google.golang.org/protobuf/proto" @@ -30,6 +30,10 @@ import ( prome "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" ) +const ( + maxRetry = 10 //number of retries +) + var errEmptyMsg = errors.New("binary msg is empty") type Producer struct { @@ -39,6 +43,7 @@ type Producer struct { producer sarama.SyncProducer } +// NewKafkaProducer Initialize kafka producer func NewKafkaProducer(addr []string, topic string) *Producer { p := Producer{} p.config = sarama.NewConfig() //Instantiate a sarama Config @@ -53,9 +58,23 @@ func NewKafkaProducer(addr []string, topic string) *Producer { } p.addr = addr p.topic = topic - producer, err := sarama.NewSyncProducer(p.addr, p.config) //Initialize the client - if err != nil { - panic(err.Error()) + var producer sarama.SyncProducer + var err error + for i := 0; i <= maxRetry; i++ { + producer, err = sarama.NewSyncProducer(p.addr, p.config) //Initialize the client + if err == nil { + p.producer = producer + return &p + } + //TODO If the password is wrong, exit directly + //if packetErr, ok := err.(*sarama.PacketEncodingError); ok { + //if _, ok := packetErr.Err.(sarama.AuthenticationError); ok { + // fmt.Println("Kafka password is wrong.") + //} + //} else { + // fmt.Printf("Failed to create Kafka producer: %v\n", err) + //} + time.Sleep(time.Duration(1) * time.Second) } p.producer = producer return &p diff --git a/pkg/common/mw/gin.go b/pkg/common/mw/gin.go index 65f98dca3..a544c2a25 100644 --- a/pkg/common/mw/gin.go +++ b/pkg/common/mw/gin.go @@ -31,6 +31,7 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" ) +// CorsHandler gin cross-domain configuration. func CorsHandler() gin.HandlerFunc { return func(c *gin.Context) { c.Writer.Header().Set("Access-Control-Allow-Origin", "*") @@ -39,19 +40,19 @@ func CorsHandler() gin.HandlerFunc { c.Header( "Access-Control-Expose-Headers", "Content-Length, Access-Control-Allow-Origin, Access-Control-Allow-Headers,Cache-Control,Content-Language,Content-Type,Expires,Last-Modified,Pragma,FooBar", - ) // 跨域关键设置 让浏览器可以解析 + ) // Cross-domain key settings allow browsers to resolve. c.Header( "Access-Control-Max-Age", "172800", - ) // 缓存请求信息 单位为秒 + ) // Cache request information in seconds. c.Header( "Access-Control-Allow-Credentials", "false", - ) // 跨域请求是否需要带cookie信息 默认设置为true + ) // Whether cross-domain requests need to carry cookie information, the default setting is true. c.Header( "content-type", "application/json", - ) // 设置返回格式是json + ) // Set the return format to json. //Release all option pre-requests if c.Request.Method == http.MethodOptions { c.JSON(http.StatusOK, "Options Request!")