Add retry mechanism to mongoDB, Redis, Kafka

pull/518/head
plutoyty 1 year ago
parent a735a70a7b
commit 35b1f75036

1
.gitignore vendored

@ -389,3 +389,4 @@ Sessionx.vim
[._]*.un~ [._]*.un~
# End of https://www.toptal.com/developers/gitignore/api/go,git,vim,tags,test,emacs,backup,jetbrains # End of https://www.toptal.com/developers/gitignore/api/go,git,vim,tags,test,emacs,backup,jetbrains
.idea

@ -27,6 +27,11 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
) )
const (
maxRetry = 10 //number of retries
)
// NewRedis Initialize redis connection
func NewRedis() (redis.UniversalClient, error) { func NewRedis() (redis.UniversalClient, error) {
if len(config.Config.Redis.Address) == 0 { if len(config.Config.Redis.Address) == 0 {
return nil, errors.New("redis address is empty") return nil, errors.New("redis address is empty")
@ -39,6 +44,7 @@ func NewRedis() (redis.UniversalClient, error) {
Username: config.Config.Redis.Username, Username: config.Config.Redis.Username,
Password: config.Config.Redis.Password, // no password set Password: config.Config.Redis.Password, // no password set
PoolSize: 50, PoolSize: 50,
MaxRetries: maxRetry,
}) })
} else { } else {
rdb = redis.NewClient(&redis.Options{ rdb = redis.NewClient(&redis.Options{
@ -46,14 +52,17 @@ func NewRedis() (redis.UniversalClient, error) {
Username: config.Config.Redis.Username, Username: config.Config.Redis.Username,
Password: config.Config.Redis.Password, // no password set Password: config.Config.Redis.Password, // no password set
DB: 0, // use default DB DB: 0, // use default DB
PoolSize: 100, // 连接池大小 PoolSize: 100, // connection pool size
MaxRetries: maxRetry,
}) })
} }
var err error = nil
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel() defer cancel()
err := rdb.Ping(ctx).Err() err = rdb.Ping(ctx).Err()
if err != nil { if err != nil {
return nil, fmt.Errorf("redis ping %w", err) return nil, fmt.Errorf("redis ping %w", err)
} }
return rdb, nil return rdb, err
} }

@ -0,0 +1,30 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cache
import (
"fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"testing"
)
//TestNewRedis Test redis connection
func TestNewRedis(t *testing.T) {
err := config.InitConfig("config_folder_path")
if err != nil {
fmt.Println("config load error")
return
}
}

@ -100,7 +100,7 @@ func (e *ExtendMsgSetMongoDriver) GetExtendMsgSet(
return &setList[0], nil return &setList[0], nil
} }
// first modify msg // InsertExtendMsg first modify msg.
func (e *ExtendMsgSetMongoDriver) InsertExtendMsg( func (e *ExtendMsgSetMongoDriver) InsertExtendMsg(
ctx context.Context, ctx context.Context,
conversationID string, conversationID string,
@ -130,7 +130,7 @@ func (e *ExtendMsgSetMongoDriver) InsertExtendMsg(
return utils.Wrap(err, "") return utils.Wrap(err, "")
} }
// insert or update // InsertOrUpdateReactionExtendMsgSet insert or update.
func (e *ExtendMsgSetMongoDriver) InsertOrUpdateReactionExtendMsgSet( func (e *ExtendMsgSetMongoDriver) InsertOrUpdateReactionExtendMsgSet(
ctx context.Context, ctx context.Context,
conversationID string, conversationID string,
@ -163,7 +163,7 @@ func (e *ExtendMsgSetMongoDriver) InsertOrUpdateReactionExtendMsgSet(
return utils.Wrap(err, "") return utils.Wrap(err, "")
} }
// delete TypeKey // DeleteReactionExtendMsgSet delete TypeKey.
func (e *ExtendMsgSetMongoDriver) DeleteReactionExtendMsgSet( func (e *ExtendMsgSetMongoDriver) DeleteReactionExtendMsgSet(
ctx context.Context, ctx context.Context,
conversationID string, conversationID string,

@ -31,19 +31,21 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
) )
const (
maxRetry = 10 //number of retries
)
type Mongo struct { type Mongo struct {
db *mongo.Client db *mongo.Client
} }
// NewMongo Initialize MongoDB connection
func NewMongo() (*Mongo, error) { func NewMongo() (*Mongo, error) {
specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound) specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound)
uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority" url := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority"
if config.Config.Mongo.Uri != "" { if config.Config.Mongo.Uri != "" {
// example: url = config.Config.Mongo.Uri
// mongodb://$user:$password@mongo1.mongo:27017,mongo2.mongo:27017,mongo3.mongo:27017/$DBDatabase/?replicaSet=rs0&readPreference=secondary&authSource=admin&maxPoolSize=$DBMaxPoolSize
uri = config.Config.Mongo.Uri
} else { } else {
//mongodb://mongodb1.example.com:27317,mongodb2.example.com:27017/?replicaSet=mySet&authSource=authDB
mongodbHosts := "" mongodbHosts := ""
for i, v := range config.Config.Mongo.Address { for i, v := range config.Config.Mongo.Address {
if i == len(config.Config.Mongo.Address)-1 { if i == len(config.Config.Mongo.Address)-1 {
@ -53,23 +55,34 @@ func NewMongo() (*Mongo, error) {
} }
} }
if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" { if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" {
uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin", url = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts, config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts,
config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize) config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize)
} else { } else {
uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin", url = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin",
mongodbHosts, config.Config.Mongo.Database, mongodbHosts, config.Config.Mongo.Database,
config.Config.Mongo.MaxPoolSize) config.Config.Mongo.MaxPoolSize)
} }
} }
fmt.Println("mongo:", uri) fmt.Println("mongo:", url)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) var mongoClient *mongo.Client
var err error = nil
for i := 0; i <= maxRetry; i++ {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel() defer cancel()
mongoClient, err := mongo.Connect(ctx, options.Client().ApplyURI(uri)) mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(url))
if err != nil { if err == nil {
return &Mongo{db: mongoClient}, nil
}
if cmdErr, ok := err.(mongo.CommandError); ok {
if cmdErr.Code == 13 || cmdErr.Code == 18 {
return nil, err return nil, err
} else {
fmt.Printf("Failed to connect to MongoDB: %s\n", err)
} }
return &Mongo{db: mongoClient}, nil }
}
return nil, err
} }
func (m *Mongo) GetClient() *mongo.Client { func (m *Mongo) GetClient() *mongo.Client {

@ -1,9 +1,18 @@
/* // Copyright © 2023 OpenIM. All rights reserved.
** description(""). //
** copyright('open-im,www.open-im.io'). // Licensed under the Apache License, Version 2.0 (the "License");
** author("fg,Gordon@tuoyun.net"). // you may not use this file except in compliance with the License.
** time(2021/5/27 10:31). // You may obtain a copy of the License at
*/package http //
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package http
import ( import (
"bytes" "bytes"

@ -17,12 +17,12 @@ package kafka
import ( import (
"context" "context"
"errors" "errors"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
log "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" log "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"time"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
@ -30,6 +30,10 @@ import (
prome "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" prome "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
) )
const (
maxRetry = 10 //number of retries
)
var errEmptyMsg = errors.New("binary msg is empty") var errEmptyMsg = errors.New("binary msg is empty")
type Producer struct { type Producer struct {
@ -39,6 +43,7 @@ type Producer struct {
producer sarama.SyncProducer producer sarama.SyncProducer
} }
// NewKafkaProducer Initialize kafka producer
func NewKafkaProducer(addr []string, topic string) *Producer { func NewKafkaProducer(addr []string, topic string) *Producer {
p := Producer{} p := Producer{}
p.config = sarama.NewConfig() //Instantiate a sarama Config p.config = sarama.NewConfig() //Instantiate a sarama Config
@ -53,9 +58,23 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
} }
p.addr = addr p.addr = addr
p.topic = topic p.topic = topic
producer, err := sarama.NewSyncProducer(p.addr, p.config) //Initialize the client var producer sarama.SyncProducer
if err != nil { var err error
panic(err.Error()) for i := 0; i <= maxRetry; i++ {
producer, err = sarama.NewSyncProducer(p.addr, p.config) //Initialize the client
if err == nil {
p.producer = producer
return &p
}
//TODO If the password is wrong, exit directly
//if packetErr, ok := err.(*sarama.PacketEncodingError); ok {
//if _, ok := packetErr.Err.(sarama.AuthenticationError); ok {
// fmt.Println("Kafka password is wrong.")
//}
//} else {
// fmt.Printf("Failed to create Kafka producer: %v\n", err)
//}
time.Sleep(time.Duration(1) * time.Second)
} }
p.producer = producer p.producer = producer
return &p return &p

@ -31,6 +31,7 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
) )
// CorsHandler gin cross-domain configuration.
func CorsHandler() gin.HandlerFunc { func CorsHandler() gin.HandlerFunc {
return func(c *gin.Context) { return func(c *gin.Context) {
c.Writer.Header().Set("Access-Control-Allow-Origin", "*") c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
@ -39,19 +40,19 @@ func CorsHandler() gin.HandlerFunc {
c.Header( c.Header(
"Access-Control-Expose-Headers", "Access-Control-Expose-Headers",
"Content-Length, Access-Control-Allow-Origin, Access-Control-Allow-Headers,Cache-Control,Content-Language,Content-Type,Expires,Last-Modified,Pragma,FooBar", "Content-Length, Access-Control-Allow-Origin, Access-Control-Allow-Headers,Cache-Control,Content-Language,Content-Type,Expires,Last-Modified,Pragma,FooBar",
) // 跨域关键设置 让浏览器可以解析 ) // Cross-domain key settings allow browsers to resolve.
c.Header( c.Header(
"Access-Control-Max-Age", "Access-Control-Max-Age",
"172800", "172800",
) // 缓存请求信息 单位为秒 ) // Cache request information in seconds.
c.Header( c.Header(
"Access-Control-Allow-Credentials", "Access-Control-Allow-Credentials",
"false", "false",
) // 跨域请求是否需要带cookie信息 默认设置为true ) // Whether cross-domain requests need to carry cookie information, the default setting is true.
c.Header( c.Header(
"content-type", "content-type",
"application/json", "application/json",
) // 设置返回格式是json ) // Set the return format to json.
//Release all option pre-requests //Release all option pre-requests
if c.Request.Method == http.MethodOptions { if c.Request.Method == http.MethodOptions {
c.JSON(http.StatusOK, "Options Request!") c.JSON(http.StatusOK, "Options Request!")

Loading…
Cancel
Save