add MongoDB,Redis,Kafka retry mechanism (#518)

pull/526/head
pluto 12 months ago committed by GitHub
parent 7bf8a898e2
commit 058e2eee32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

1
.gitignore vendored

@ -391,3 +391,4 @@ Sessionx.vim
[._]*.un~ [._]*.un~
# End of https://www.toptal.com/developers/gitignore/api/go,git,vim,tags,test,emacs,backup,jetbrains # End of https://www.toptal.com/developers/gitignore/api/go,git,vim,tags,test,emacs,backup,jetbrains
.idea

@ -36,7 +36,7 @@ mysql:
mongo: mongo:
uri: #不为空则直接使用该值 uri: #不为空则直接使用该值
address: [ 127.0.0.1:37017 ] #单机时为mongo地址使用分片集群时为mongos地址 address: [ 127.0.0.1:37017 ] #单机时为mongo地址使用分片集群时为mongos地址
database: openIM_v3 #mongo db 默认即可 database: openIM_v3 #mongo db 默认即可
username: root #用户名 username: root #用户名
password: openIM123 #密码 password: openIM123 #密码
maxPoolSize: 100 maxPoolSize: 100
@ -56,11 +56,14 @@ kafka:
topic: "offlineMsgToMongoMysql" #不建议修改 topic: "offlineMsgToMongoMysql" #不建议修改
msgToPush: msgToPush:
topic: "msgToPush" #不建议修改 topic: "msgToPush" #不建议修改
msgToModify:
topic: "msgToModify" #不建议修改
consumerGroupID: #消费者组,不建议修改 consumerGroupID: #消费者组,不建议修改
msgToRedis: redis # msgToRedis: redis #
msgToMongo: mongo # msgToMongo: mongo #
msgToMySql: mysql # msgToMySql: mysql #
msgToPush: push # msgToPush: push #
msgToModify: modify #
rpc: rpc:
@ -73,26 +76,41 @@ api:
listenIP: #默认为0.0.0.0 listenIP: #默认为0.0.0.0
object: object:
enable: "minio" #使用minio enable: minio #使用minio
apiURL: "http://127.0.0.1:10002/object/" apiURL: http://127.0.0.1:10002/third/object
minio: minio:
bucket: "openim" #不建议修改 tempBucket: "openim" #不建议修改
endpoint: "http://127.0.0.1:10005" #minio对外服务的ip和端口app要能访问此ip和端口 dataBucket: "openim" #不建议修改
accessKeyID: "root" #ID location: us-east-1 #不建议修改
secretAccessKey: "openIM123" #秘钥 endpoint: http://127.0.0.1:10005 #minio对外服务的ip和端口app要能访问此ip和端口
sessionToken: "" #token accessKeyID: root #ID
cos: #tencent cos secretAccessKey: openIM123 #秘钥
bucketURL: "https://temp-1252357374.cos.ap-chengdu.myqcloud.com" isDistributedMod: false #是否分布式多硬盘部署如果是多硬盘部署需要修改为true
secretID: "" tencent: #tencent cos
secretKey: "" appID:
sessionToken: "" region:
oss: #ali oss bucket:
endpoint: "https://oss-cn-chengdu.aliyuncs.com" secretID:
bucket: "demo-9999999" secretKey:
bucketURL: "https://demo-9999999.oss-cn-chengdu.aliyuncs.com" ali: #ali oss
accessKeyID: "" regionID:
accessKeySecret: "" accessKeyID:
sessionToken: "" accessKeySecret:
stsEndpoint:
ossEndpoint:
bucket:
finalHost:
stsDurationSeconds:
OssRoleArn:
aws:
accessKeyID:
accessKeySecret:
region:
bucket:
finalHost:
roleArn:
externalId:
roleSessionName:
rpcPort: #rpc服务端口不建议修改端口由脚本读取后传入程序如启动多个程序只需要填入多个端口用逗号隔开如 [10110, 10111] rpcPort: #rpc服务端口不建议修改端口由脚本读取后传入程序如启动多个程序只需要填入多个端口用逗号隔开如 [10110, 10111]
openImUserPort: [ 10110 ] openImUserPort: [ 10110 ]
@ -164,8 +182,7 @@ groupMessageHasReadReceiptEnable: true #群聊已读是否开
singleMessageHasReadReceiptEnable: true #单聊已读是否开启 singleMessageHasReadReceiptEnable: true #单聊已读是否开启
retainChatRecords: 365 #mongo保存离线消息时间 retainChatRecords: 365 #mongo保存离线消息时间
chatRecordsClearTime: "0 2 * * 3" #每周三凌晨2点清理mongo中的过期超过retainChatRecords时间消息这个删除是为了清理满足上个配置retainChatRecords的过期消息不会发送通知仅仅作为清理磁盘使用 chatRecordsClearTime: "0 2 * * 3" #每周三凌晨2点清理mongo中的过期超过retainChatRecords时间消息
msgDestructTime: "0 2 * * *" #消息自动删除时间每天凌晨2点删除过期消息这个删除是为了删除保留时间超过超过会话字段msg_destruct_time的消息。
secret: tuoyun #秘钥获取token时校验 secret: tuoyun #秘钥获取token时校验

@ -27,6 +27,11 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
) )
const (
maxRetry = 10 //number of retries
)
// NewRedis Initialize redis connection
func NewRedis() (redis.UniversalClient, error) { func NewRedis() (redis.UniversalClient, error) {
if len(config.Config.Redis.Address) == 0 { if len(config.Config.Redis.Address) == 0 {
return nil, errors.New("redis address is empty") return nil, errors.New("redis address is empty")
@ -35,25 +40,29 @@ func NewRedis() (redis.UniversalClient, error) {
var rdb redis.UniversalClient var rdb redis.UniversalClient
if len(config.Config.Redis.Address) > 1 { if len(config.Config.Redis.Address) > 1 {
rdb = redis.NewClusterClient(&redis.ClusterOptions{ rdb = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: config.Config.Redis.Address, Addrs: config.Config.Redis.Address,
Username: config.Config.Redis.Username, Username: config.Config.Redis.Username,
Password: config.Config.Redis.Password, // no password set Password: config.Config.Redis.Password, // no password set
PoolSize: 50, PoolSize: 50,
MaxRetries: maxRetry,
}) })
} else { } else {
rdb = redis.NewClient(&redis.Options{ rdb = redis.NewClient(&redis.Options{
Addr: config.Config.Redis.Address[0], Addr: config.Config.Redis.Address[0],
Username: config.Config.Redis.Username, Username: config.Config.Redis.Username,
Password: config.Config.Redis.Password, // no password set Password: config.Config.Redis.Password, // no password set
DB: 0, // use default DB DB: 0, // use default DB
PoolSize: 100, // 连接池大小 PoolSize: 100, // connection pool size
MaxRetries: maxRetry,
}) })
} }
var err error = nil
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel() defer cancel()
err := rdb.Ping(ctx).Err() err = rdb.Ping(ctx).Err()
if err != nil { if err != nil {
return nil, fmt.Errorf("redis ping %w", err) return nil, fmt.Errorf("redis ping %w", err)
} }
return rdb, nil return rdb, err
} }

@ -0,0 +1,36 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cache
import (
"fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"testing"
)
//TestNewRedis Test redis connection
func TestNewRedis(t *testing.T) {
err := config.InitConfig("config_folder_path")
if err != nil {
fmt.Println("config load error")
return
}
redis, err := NewRedis()
if err != nil {
fmt.Println(err)
return
}
fmt.Println(redis)
}

@ -31,19 +31,21 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
) )
const (
maxRetry = 10 //number of retries
)
type Mongo struct { type Mongo struct {
db *mongo.Client db *mongo.Client
} }
// NewMongo Initialize MongoDB connection
func NewMongo() (*Mongo, error) { func NewMongo() (*Mongo, error) {
specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound) specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound)
uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority" url := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority"
if config.Config.Mongo.Uri != "" { if config.Config.Mongo.Uri != "" {
// example: url = config.Config.Mongo.Uri
// mongodb://$user:$password@mongo1.mongo:27017,mongo2.mongo:27017,mongo3.mongo:27017/$DBDatabase/?replicaSet=rs0&readPreference=secondary&authSource=admin&maxPoolSize=$DBMaxPoolSize
uri = config.Config.Mongo.Uri
} else { } else {
//mongodb://mongodb1.example.com:27317,mongodb2.example.com:27017/?replicaSet=mySet&authSource=authDB
mongodbHosts := "" mongodbHosts := ""
for i, v := range config.Config.Mongo.Address { for i, v := range config.Config.Mongo.Address {
if i == len(config.Config.Mongo.Address)-1 { if i == len(config.Config.Mongo.Address)-1 {
@ -53,23 +55,34 @@ func NewMongo() (*Mongo, error) {
} }
} }
if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" { if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" {
uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin", url = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts, config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts,
config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize) config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize)
} else { } else {
uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin", url = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin",
mongodbHosts, config.Config.Mongo.Database, mongodbHosts, config.Config.Mongo.Database,
config.Config.Mongo.MaxPoolSize) config.Config.Mongo.MaxPoolSize)
} }
} }
fmt.Println("mongo:", uri) fmt.Println("mongo:", url)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) var mongoClient *mongo.Client
defer cancel() var err error = nil
mongoClient, err := mongo.Connect(ctx, options.Client().ApplyURI(uri)) for i := 0; i <= maxRetry; i++ {
if err != nil { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
return nil, err defer cancel()
mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(url))
if err == nil {
return &Mongo{db: mongoClient}, nil
}
if cmdErr, ok := err.(mongo.CommandError); ok {
if cmdErr.Code == 13 || cmdErr.Code == 18 {
return nil, err
} else {
fmt.Printf("Failed to connect to MongoDB: %s\n", err)
}
}
} }
return &Mongo{db: mongoClient}, nil return nil, err
} }
func (m *Mongo) GetClient() *mongo.Client { func (m *Mongo) GetClient() *mongo.Client {

@ -1,9 +1,18 @@
/* // Copyright © 2023 OpenIM. All rights reserved.
** description(""). //
** copyright('open-im,www.open-im.io'). // Licensed under the Apache License, Version 2.0 (the "License");
** author("fg,Gordon@tuoyun.net"). // you may not use this file except in compliance with the License.
** time(2021/5/27 10:31). // You may obtain a copy of the License at
*/package http //
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package http
import ( import (
"bytes" "bytes"

@ -17,12 +17,12 @@ package kafka
import ( import (
"context" "context"
"errors" "errors"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
log "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" log "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"time"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
@ -30,6 +30,10 @@ import (
prome "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" prome "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
) )
const (
maxRetry = 10 //number of retries
)
var errEmptyMsg = errors.New("binary msg is empty") var errEmptyMsg = errors.New("binary msg is empty")
type Producer struct { type Producer struct {
@ -39,6 +43,7 @@ type Producer struct {
producer sarama.SyncProducer producer sarama.SyncProducer
} }
// NewKafkaProducer Initialize kafka producer
func NewKafkaProducer(addr []string, topic string) *Producer { func NewKafkaProducer(addr []string, topic string) *Producer {
p := Producer{} p := Producer{}
p.config = sarama.NewConfig() //Instantiate a sarama Config p.config = sarama.NewConfig() //Instantiate a sarama Config
@ -53,7 +58,24 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
} }
p.addr = addr p.addr = addr
p.topic = topic p.topic = topic
producer, err := sarama.NewSyncProducer(p.addr, p.config) //Initialize the client var producer sarama.SyncProducer
var err error
for i := 0; i <= maxRetry; i++ {
producer, err = sarama.NewSyncProducer(p.addr, p.config) //Initialize the client
if err == nil {
p.producer = producer
return &p
}
//TODO If the password is wrong, exit directly
//if packetErr, ok := err.(*sarama.PacketEncodingError); ok {
//if _, ok := packetErr.Err.(sarama.AuthenticationError); ok {
// fmt.Println("Kafka password is wrong.")
//}
//} else {
// fmt.Printf("Failed to create Kafka producer: %v\n", err)
//}
time.Sleep(time.Duration(1) * time.Second)
}
if err != nil { if err != nil {
panic(err.Error()) panic(err.Error())
} }

@ -31,6 +31,7 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
) )
// CorsHandler gin cross-domain configuration.
func CorsHandler() gin.HandlerFunc { func CorsHandler() gin.HandlerFunc {
return func(c *gin.Context) { return func(c *gin.Context) {
c.Writer.Header().Set("Access-Control-Allow-Origin", "*") c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
@ -39,19 +40,19 @@ func CorsHandler() gin.HandlerFunc {
c.Header( c.Header(
"Access-Control-Expose-Headers", "Access-Control-Expose-Headers",
"Content-Length, Access-Control-Allow-Origin, Access-Control-Allow-Headers,Cache-Control,Content-Language,Content-Type,Expires,Last-Modified,Pragma,FooBar", "Content-Length, Access-Control-Allow-Origin, Access-Control-Allow-Headers,Cache-Control,Content-Language,Content-Type,Expires,Last-Modified,Pragma,FooBar",
) // 跨域关键设置 让浏览器可以解析 ) // Cross-domain key settings allow browsers to resolve.
c.Header( c.Header(
"Access-Control-Max-Age", "Access-Control-Max-Age",
"172800", "172800",
) // 缓存请求信息 单位为秒 ) // Cache request information in seconds.
c.Header( c.Header(
"Access-Control-Allow-Credentials", "Access-Control-Allow-Credentials",
"false", "false",
) // 跨域请求是否需要带cookie信息 默认设置为true ) // Whether cross-domain requests need to carry cookie information, the default setting is true.
c.Header( c.Header(
"content-type", "content-type",
"application/json", "application/json",
) // 设置返回格式是json ) // Set the return format to json.
//Release all option pre-requests //Release all option pre-requests
if c.Request.Method == http.MethodOptions { if c.Request.Method == http.MethodOptions {
c.JSON(http.StatusOK, "Options Request!") c.JSON(http.StatusOK, "Options Request!")

Loading…
Cancel
Save