From 2b041d8f6a71efc73159051b9c3f53a46ebc0777 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E6=9D=A8=E8=85=BE=E5=AE=87?= <2631223275@qq.com>
Date: Tue, 11 Jul 2023 17:34:39 +0800
Subject: [PATCH 1/8] Add database retry
---
pkg/common/db/relation/mysql_init.go | 33 ++++++++++++++++++++++------
1 file changed, 26 insertions(+), 7 deletions(-)
diff --git a/pkg/common/db/relation/mysql_init.go b/pkg/common/db/relation/mysql_init.go
index 67419c7bf..fe467b675 100644
--- a/pkg/common/db/relation/mysql_init.go
+++ b/pkg/common/db/relation/mysql_init.go
@@ -30,16 +30,18 @@ import (
"gorm.io/gorm/logger"
)
+const (
+ maxRetry = 100
+)
+
+//newMysqlGormDB Initialize the database connection
func newMysqlGormDB() (*gorm.DB, error) {
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local",
config.Config.Mysql.Username, config.Config.Mysql.Password, config.Config.Mysql.Address[0], "mysql")
- db, err := gorm.Open(mysql.Open(dsn), nil)
+
+ db, err := connectToDatabase(dsn, maxRetry)
if err != nil {
- time.Sleep(time.Duration(30) * time.Second)
- db, err = gorm.Open(mysql.Open(dsn), nil)
- if err != nil {
- panic(err.Error() + " open failed " + dsn)
- }
+ panic(err.Error() + " Open failed " + dsn)
}
sqlDB, err := db.DB()
if err != nil {
@@ -82,7 +84,24 @@ func newMysqlGormDB() (*gorm.DB, error) {
return db, nil
}
-// gorm mysql
+//connectToDatabase Connection retry for mysql
+func connectToDatabase(dsn string, maxRetry int) (*gorm.DB, error) {
+ var db *gorm.DB
+ var err error
+ for i := 0; i <= maxRetry; i++ {
+ db, err = gorm.Open(mysql.Open(dsn), nil)
+ if err == nil {
+ return db, nil
+ }
+ if mysqlErr, ok := err.(*mysqlDriver.MySQLError); ok && mysqlErr.Number == 1045 {
+ return nil, err
+ }
+ time.Sleep(time.Duration(1) * time.Second)
+ }
+ return nil, err
+}
+
+// NewGormDB gorm mysql
func NewGormDB() (*gorm.DB, error) {
specialerror.AddReplace(gorm.ErrRecordNotFound, errs.ErrRecordNotFound)
specialerror.AddErrHandler(replaceDuplicateKey)
From 2693db218ad2224a749a2b21dcc14a48502a2fac Mon Sep 17 00:00:00 2001
From: plutoyty <2631223275@qq.com>
Date: Wed, 12 Jul 2023 11:35:06 +0800
Subject: [PATCH 2/8] For "add database retry" add test
---
pkg/common/db/relation/mysql_init.go | 2 +-
pkg/common/db/relation/mysql_init_test.go | 37 +++++++++++++++++++++++
2 files changed, 38 insertions(+), 1 deletion(-)
create mode 100644 pkg/common/db/relation/mysql_init_test.go
diff --git a/pkg/common/db/relation/mysql_init.go b/pkg/common/db/relation/mysql_init.go
index fe467b675..8586ac338 100644
--- a/pkg/common/db/relation/mysql_init.go
+++ b/pkg/common/db/relation/mysql_init.go
@@ -31,7 +31,7 @@ import (
)
const (
- maxRetry = 100
+ maxRetry = 100 //number of retries
)
//newMysqlGormDB Initialize the database connection
diff --git a/pkg/common/db/relation/mysql_init_test.go b/pkg/common/db/relation/mysql_init_test.go
new file mode 100644
index 000000000..c6fd3e14b
--- /dev/null
+++ b/pkg/common/db/relation/mysql_init_test.go
@@ -0,0 +1,37 @@
+// Copyright © 2023 OpenIM. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package relation
+
+import (
+ "fmt"
+ "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
+ "testing"
+)
+
+//TestNewGormDB Test the retry of sporadic errors and the direct exit of wrong password.
+func TestNewGormDB(t *testing.T) {
+ err := config.InitConfig("config_folder_path")
+ if err != nil {
+ return
+ }
+ db, err := newMysqlGormDB()
+ if err != nil {
+ fmt.Println("password error")
+ return
+ }
+ if db != nil {
+ fmt.Println("success connect")
+ }
+}
From 6bc369ed0087ff50afa98ba337baf816616205bf Mon Sep 17 00:00:00 2001
From: plutoyty <2631223275@qq.com>
Date: Wed, 12 Jul 2023 11:43:30 +0800
Subject: [PATCH 3/8] For "add database retry" add test
---
pkg/common/db/relation/mysql_init_test.go | 1 +
1 file changed, 1 insertion(+)
diff --git a/pkg/common/db/relation/mysql_init_test.go b/pkg/common/db/relation/mysql_init_test.go
index c6fd3e14b..a4f38f7bc 100644
--- a/pkg/common/db/relation/mysql_init_test.go
+++ b/pkg/common/db/relation/mysql_init_test.go
@@ -24,6 +24,7 @@ import (
func TestNewGormDB(t *testing.T) {
err := config.InitConfig("config_folder_path")
if err != nil {
+ fmt.Println("config load error")
return
}
db, err := newMysqlGormDB()
From 2f5bcf1cf71a683620b0cd6c3e0918f939e03dfe Mon Sep 17 00:00:00 2001
From: plutoyty <2631223275@qq.com>
Date: Wed, 12 Jul 2023 11:53:22 +0800
Subject: [PATCH 4/8] For 'add database retry' add test
Signed-off-by: plutoyty <2631223275@qq.com>
---
.idea/.gitignore | 8 ++++++++
.idea/Open-IM-Server.iml | 9 +++++++++
.idea/modules.xml | 8 ++++++++
.idea/restkit/RESTKit_CommonSetting.xml | 6 ++++++
.idea/restkit/RESTKit_Environment.xml | 6 ++++++
.idea/restkit/RESTKit_RequestSetting.xml | 7 +++++++
.idea/vcs.xml | 6 ++++++
7 files changed, 50 insertions(+)
create mode 100644 .idea/.gitignore
create mode 100644 .idea/Open-IM-Server.iml
create mode 100644 .idea/modules.xml
create mode 100644 .idea/restkit/RESTKit_CommonSetting.xml
create mode 100644 .idea/restkit/RESTKit_Environment.xml
create mode 100644 .idea/restkit/RESTKit_RequestSetting.xml
create mode 100644 .idea/vcs.xml
diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 000000000..35410cacd
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,8 @@
+# 默认忽略的文件
+/shelf/
+/workspace.xml
+# 基于编辑器的 HTTP 客户端请求
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
diff --git a/.idea/Open-IM-Server.iml b/.idea/Open-IM-Server.iml
new file mode 100644
index 000000000..5e764c4f0
--- /dev/null
+++ b/.idea/Open-IM-Server.iml
@@ -0,0 +1,9 @@
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 000000000..d9805dbb6
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/restkit/RESTKit_CommonSetting.xml b/.idea/restkit/RESTKit_CommonSetting.xml
new file mode 100644
index 000000000..26e6a792c
--- /dev/null
+++ b/.idea/restkit/RESTKit_CommonSetting.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/restkit/RESTKit_Environment.xml b/.idea/restkit/RESTKit_Environment.xml
new file mode 100644
index 000000000..cb8704de7
--- /dev/null
+++ b/.idea/restkit/RESTKit_Environment.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/restkit/RESTKit_RequestSetting.xml b/.idea/restkit/RESTKit_RequestSetting.xml
new file mode 100644
index 000000000..6e47d4d1b
--- /dev/null
+++ b/.idea/restkit/RESTKit_RequestSetting.xml
@@ -0,0 +1,7 @@
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 000000000..35eb1ddfb
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
From a735a70a7b8e3204bfc15da7eb9fb02a133e9bf8 Mon Sep 17 00:00:00 2001
From: plutoyty <2631223275@qq.com>
Date: Wed, 12 Jul 2023 12:05:32 +0800
Subject: [PATCH 5/8] Remove .idea directory and add it to .gitignore
---
.idea/.gitignore | 8 --------
.idea/Open-IM-Server.iml | 9 ---------
.idea/modules.xml | 8 --------
.idea/restkit/RESTKit_CommonSetting.xml | 6 ------
.idea/restkit/RESTKit_Environment.xml | 6 ------
.idea/restkit/RESTKit_RequestSetting.xml | 7 -------
.idea/vcs.xml | 6 ------
7 files changed, 50 deletions(-)
delete mode 100644 .idea/.gitignore
delete mode 100644 .idea/Open-IM-Server.iml
delete mode 100644 .idea/modules.xml
delete mode 100644 .idea/restkit/RESTKit_CommonSetting.xml
delete mode 100644 .idea/restkit/RESTKit_Environment.xml
delete mode 100644 .idea/restkit/RESTKit_RequestSetting.xml
delete mode 100644 .idea/vcs.xml
diff --git a/.idea/.gitignore b/.idea/.gitignore
deleted file mode 100644
index 35410cacd..000000000
--- a/.idea/.gitignore
+++ /dev/null
@@ -1,8 +0,0 @@
-# 默认忽略的文件
-/shelf/
-/workspace.xml
-# 基于编辑器的 HTTP 客户端请求
-/httpRequests/
-# Datasource local storage ignored files
-/dataSources/
-/dataSources.local.xml
diff --git a/.idea/Open-IM-Server.iml b/.idea/Open-IM-Server.iml
deleted file mode 100644
index 5e764c4f0..000000000
--- a/.idea/Open-IM-Server.iml
+++ /dev/null
@@ -1,9 +0,0 @@
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
deleted file mode 100644
index d9805dbb6..000000000
--- a/.idea/modules.xml
+++ /dev/null
@@ -1,8 +0,0 @@
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/restkit/RESTKit_CommonSetting.xml b/.idea/restkit/RESTKit_CommonSetting.xml
deleted file mode 100644
index 26e6a792c..000000000
--- a/.idea/restkit/RESTKit_CommonSetting.xml
+++ /dev/null
@@ -1,6 +0,0 @@
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/restkit/RESTKit_Environment.xml b/.idea/restkit/RESTKit_Environment.xml
deleted file mode 100644
index cb8704de7..000000000
--- a/.idea/restkit/RESTKit_Environment.xml
+++ /dev/null
@@ -1,6 +0,0 @@
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/restkit/RESTKit_RequestSetting.xml b/.idea/restkit/RESTKit_RequestSetting.xml
deleted file mode 100644
index 6e47d4d1b..000000000
--- a/.idea/restkit/RESTKit_RequestSetting.xml
+++ /dev/null
@@ -1,7 +0,0 @@
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
deleted file mode 100644
index 35eb1ddfb..000000000
--- a/.idea/vcs.xml
+++ /dev/null
@@ -1,6 +0,0 @@
-
-
-
-
-
-
\ No newline at end of file
From 35b1f75036c9939a894e6187c73ac2190ca07778 Mon Sep 17 00:00:00 2001
From: plutoyty <2631223275@qq.com>
Date: Wed, 12 Jul 2023 19:15:39 +0800
Subject: [PATCH 6/8] Add retry mechanism to mongoDB, Redis, Kafka
---
.gitignore | 1 +
config/config.yaml | 2 +-
pkg/common/db/cache/init_redis.go | 31 ++++++++++++-------
pkg/common/db/cache/init_redis_test.go | 30 +++++++++++++++++++
pkg/common/db/unrelation/extend_msg.go | 6 ++--
pkg/common/db/unrelation/mongo.go | 41 +++++++++++++++++---------
pkg/common/http/http_client.go | 21 +++++++++----
pkg/common/kafka/producer.go | 27 ++++++++++++++---
pkg/common/mw/gin.go | 9 +++---
9 files changed, 125 insertions(+), 43 deletions(-)
create mode 100644 pkg/common/db/cache/init_redis_test.go
diff --git a/.gitignore b/.gitignore
index 6e29bad2c..cabe1a427 100644
--- a/.gitignore
+++ b/.gitignore
@@ -389,3 +389,4 @@ Sessionx.vim
[._]*.un~
# End of https://www.toptal.com/developers/gitignore/api/go,git,vim,tags,test,emacs,backup,jetbrains
+.idea
\ No newline at end of file
diff --git a/config/config.yaml b/config/config.yaml
index ea6f8772d..4ff0e44b7 100644
--- a/config/config.yaml
+++ b/config/config.yaml
@@ -36,7 +36,7 @@ mysql:
mongo:
uri: #不为空则直接使用该值
address: [ 127.0.0.1:37017 ] #单机时为mongo地址,使用分片集群时,为mongos地址
- database: openIM_v3 #mongo db 默认即可
+ database: openIM_v3 #mongo db 默认即可
username: root #用户名
password: openIM123 #密码
maxPoolSize: 100
diff --git a/pkg/common/db/cache/init_redis.go b/pkg/common/db/cache/init_redis.go
index 72dfc8caf..be0431adf 100644
--- a/pkg/common/db/cache/init_redis.go
+++ b/pkg/common/db/cache/init_redis.go
@@ -27,6 +27,11 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
)
+const (
+ maxRetry = 10 //number of retries
+)
+
+// NewRedis Initialize redis connection
func NewRedis() (redis.UniversalClient, error) {
if len(config.Config.Redis.Address) == 0 {
return nil, errors.New("redis address is empty")
@@ -35,25 +40,29 @@ func NewRedis() (redis.UniversalClient, error) {
var rdb redis.UniversalClient
if len(config.Config.Redis.Address) > 1 {
rdb = redis.NewClusterClient(&redis.ClusterOptions{
- Addrs: config.Config.Redis.Address,
- Username: config.Config.Redis.Username,
- Password: config.Config.Redis.Password, // no password set
- PoolSize: 50,
+ Addrs: config.Config.Redis.Address,
+ Username: config.Config.Redis.Username,
+ Password: config.Config.Redis.Password, // no password set
+ PoolSize: 50,
+ MaxRetries: maxRetry,
})
} else {
rdb = redis.NewClient(&redis.Options{
- Addr: config.Config.Redis.Address[0],
- Username: config.Config.Redis.Username,
- Password: config.Config.Redis.Password, // no password set
- DB: 0, // use default DB
- PoolSize: 100, // 连接池大小
+ Addr: config.Config.Redis.Address[0],
+ Username: config.Config.Redis.Username,
+ Password: config.Config.Redis.Password, // no password set
+ DB: 0, // use default DB
+ PoolSize: 100, // connection pool size
+ MaxRetries: maxRetry,
})
}
+
+ var err error = nil
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
- err := rdb.Ping(ctx).Err()
+ err = rdb.Ping(ctx).Err()
if err != nil {
return nil, fmt.Errorf("redis ping %w", err)
}
- return rdb, nil
+ return rdb, err
}
diff --git a/pkg/common/db/cache/init_redis_test.go b/pkg/common/db/cache/init_redis_test.go
new file mode 100644
index 000000000..6f78a43bd
--- /dev/null
+++ b/pkg/common/db/cache/init_redis_test.go
@@ -0,0 +1,30 @@
+// Copyright © 2023 OpenIM. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cache
+
+import (
+ "fmt"
+ "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
+ "testing"
+)
+
+//TestNewRedis Test redis connection
+func TestNewRedis(t *testing.T) {
+ err := config.InitConfig("config_folder_path")
+ if err != nil {
+ fmt.Println("config load error")
+ return
+ }
+}
diff --git a/pkg/common/db/unrelation/extend_msg.go b/pkg/common/db/unrelation/extend_msg.go
index 17e0b2e19..77f65cbd2 100644
--- a/pkg/common/db/unrelation/extend_msg.go
+++ b/pkg/common/db/unrelation/extend_msg.go
@@ -100,7 +100,7 @@ func (e *ExtendMsgSetMongoDriver) GetExtendMsgSet(
return &setList[0], nil
}
-// first modify msg
+// InsertExtendMsg first modify msg.
func (e *ExtendMsgSetMongoDriver) InsertExtendMsg(
ctx context.Context,
conversationID string,
@@ -130,7 +130,7 @@ func (e *ExtendMsgSetMongoDriver) InsertExtendMsg(
return utils.Wrap(err, "")
}
-// insert or update
+// InsertOrUpdateReactionExtendMsgSet insert or update.
func (e *ExtendMsgSetMongoDriver) InsertOrUpdateReactionExtendMsgSet(
ctx context.Context,
conversationID string,
@@ -163,7 +163,7 @@ func (e *ExtendMsgSetMongoDriver) InsertOrUpdateReactionExtendMsgSet(
return utils.Wrap(err, "")
}
-// delete TypeKey
+// DeleteReactionExtendMsgSet delete TypeKey.
func (e *ExtendMsgSetMongoDriver) DeleteReactionExtendMsgSet(
ctx context.Context,
conversationID string,
diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go
index 51b9e4b7e..911edef3c 100644
--- a/pkg/common/db/unrelation/mongo.go
+++ b/pkg/common/db/unrelation/mongo.go
@@ -31,19 +31,21 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
)
+const (
+ maxRetry = 10 //number of retries
+)
+
type Mongo struct {
db *mongo.Client
}
+// NewMongo Initialize MongoDB connection
func NewMongo() (*Mongo, error) {
specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound)
- uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority"
+ url := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority"
if config.Config.Mongo.Uri != "" {
- // example:
- // mongodb://$user:$password@mongo1.mongo:27017,mongo2.mongo:27017,mongo3.mongo:27017/$DBDatabase/?replicaSet=rs0&readPreference=secondary&authSource=admin&maxPoolSize=$DBMaxPoolSize
- uri = config.Config.Mongo.Uri
+ url = config.Config.Mongo.Uri
} else {
- //mongodb://mongodb1.example.com:27317,mongodb2.example.com:27017/?replicaSet=mySet&authSource=authDB
mongodbHosts := ""
for i, v := range config.Config.Mongo.Address {
if i == len(config.Config.Mongo.Address)-1 {
@@ -53,23 +55,34 @@ func NewMongo() (*Mongo, error) {
}
}
if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" {
- uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
+ url = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts,
config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize)
} else {
- uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin",
+ url = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin",
mongodbHosts, config.Config.Mongo.Database,
config.Config.Mongo.MaxPoolSize)
}
}
- fmt.Println("mongo:", uri)
- ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
- defer cancel()
- mongoClient, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
- if err != nil {
- return nil, err
+ fmt.Println("mongo:", url)
+ var mongoClient *mongo.Client
+ var err error = nil
+ for i := 0; i <= maxRetry; i++ {
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+ defer cancel()
+ mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(url))
+ if err == nil {
+ return &Mongo{db: mongoClient}, nil
+ }
+ if cmdErr, ok := err.(mongo.CommandError); ok {
+ if cmdErr.Code == 13 || cmdErr.Code == 18 {
+ return nil, err
+ } else {
+ fmt.Printf("Failed to connect to MongoDB: %s\n", err)
+ }
+ }
}
- return &Mongo{db: mongoClient}, nil
+ return nil, err
}
func (m *Mongo) GetClient() *mongo.Client {
diff --git a/pkg/common/http/http_client.go b/pkg/common/http/http_client.go
index 72e3fae62..153deb30e 100644
--- a/pkg/common/http/http_client.go
+++ b/pkg/common/http/http_client.go
@@ -1,9 +1,18 @@
-/*
-** description("").
-** copyright('open-im,www.open-im.io').
-** author("fg,Gordon@tuoyun.net").
-** time(2021/5/27 10:31).
- */package http
+// Copyright © 2023 OpenIM. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package http
import (
"bytes"
diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go
index 4c4ebc460..a66ef3dba 100644
--- a/pkg/common/kafka/producer.go
+++ b/pkg/common/kafka/producer.go
@@ -17,12 +17,12 @@ package kafka
import (
"context"
"errors"
-
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
log "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
+ "time"
"github.com/Shopify/sarama"
"google.golang.org/protobuf/proto"
@@ -30,6 +30,10 @@ import (
prome "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
)
+const (
+ maxRetry = 10 //number of retries
+)
+
var errEmptyMsg = errors.New("binary msg is empty")
type Producer struct {
@@ -39,6 +43,7 @@ type Producer struct {
producer sarama.SyncProducer
}
+// NewKafkaProducer Initialize kafka producer
func NewKafkaProducer(addr []string, topic string) *Producer {
p := Producer{}
p.config = sarama.NewConfig() //Instantiate a sarama Config
@@ -53,9 +58,23 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
}
p.addr = addr
p.topic = topic
- producer, err := sarama.NewSyncProducer(p.addr, p.config) //Initialize the client
- if err != nil {
- panic(err.Error())
+ var producer sarama.SyncProducer
+ var err error
+ for i := 0; i <= maxRetry; i++ {
+ producer, err = sarama.NewSyncProducer(p.addr, p.config) //Initialize the client
+ if err == nil {
+ p.producer = producer
+ return &p
+ }
+ //TODO If the password is wrong, exit directly
+ //if packetErr, ok := err.(*sarama.PacketEncodingError); ok {
+ //if _, ok := packetErr.Err.(sarama.AuthenticationError); ok {
+ // fmt.Println("Kafka password is wrong.")
+ //}
+ //} else {
+ // fmt.Printf("Failed to create Kafka producer: %v\n", err)
+ //}
+ time.Sleep(time.Duration(1) * time.Second)
}
p.producer = producer
return &p
diff --git a/pkg/common/mw/gin.go b/pkg/common/mw/gin.go
index 65f98dca3..a544c2a25 100644
--- a/pkg/common/mw/gin.go
+++ b/pkg/common/mw/gin.go
@@ -31,6 +31,7 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
)
+// CorsHandler gin cross-domain configuration.
func CorsHandler() gin.HandlerFunc {
return func(c *gin.Context) {
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
@@ -39,19 +40,19 @@ func CorsHandler() gin.HandlerFunc {
c.Header(
"Access-Control-Expose-Headers",
"Content-Length, Access-Control-Allow-Origin, Access-Control-Allow-Headers,Cache-Control,Content-Language,Content-Type,Expires,Last-Modified,Pragma,FooBar",
- ) // 跨域关键设置 让浏览器可以解析
+ ) // Cross-domain key settings allow browsers to resolve.
c.Header(
"Access-Control-Max-Age",
"172800",
- ) // 缓存请求信息 单位为秒
+ ) // Cache request information in seconds.
c.Header(
"Access-Control-Allow-Credentials",
"false",
- ) // 跨域请求是否需要带cookie信息 默认设置为true
+ ) // Whether cross-domain requests need to carry cookie information, the default setting is true.
c.Header(
"content-type",
"application/json",
- ) // 设置返回格式是json
+ ) // Set the return format to json.
//Release all option pre-requests
if c.Request.Method == http.MethodOptions {
c.JSON(http.StatusOK, "Options Request!")
From d41f2a7aef2802f9656241873b25d23ff391d30a Mon Sep 17 00:00:00 2001
From: plutoyty <2631223275@qq.com>
Date: Wed, 12 Jul 2023 20:21:52 +0800
Subject: [PATCH 7/8] Add retry mechanism to mongoDB, Redis, Kafka
---
config/config.yaml | 59 +++++++++++++++++---------
pkg/common/db/cache/init_redis_test.go | 6 +++
pkg/common/kafka/producer.go | 3 ++
3 files changed, 47 insertions(+), 21 deletions(-)
diff --git a/config/config.yaml b/config/config.yaml
index a001c68c5..4ff0e44b7 100644
--- a/config/config.yaml
+++ b/config/config.yaml
@@ -56,11 +56,14 @@ kafka:
topic: "offlineMsgToMongoMysql" #不建议修改
msgToPush:
topic: "msgToPush" #不建议修改
+ msgToModify:
+ topic: "msgToModify" #不建议修改
consumerGroupID: #消费者组,不建议修改
msgToRedis: redis #
msgToMongo: mongo #
msgToMySql: mysql #
msgToPush: push #
+ msgToModify: modify #
rpc:
@@ -73,26 +76,41 @@ api:
listenIP: #默认为0.0.0.0
object:
- enable: "minio" #使用minio
- apiURL: "http://127.0.0.1:10002/object/"
+ enable: minio #使用minio
+ apiURL: http://127.0.0.1:10002/third/object
minio:
- bucket: "openim" #不建议修改
- endpoint: "http://127.0.0.1:10005" #minio对外服务的ip和端口,app要能访问此ip和端口
- accessKeyID: "root" #ID
- secretAccessKey: "openIM123" #秘钥
- sessionToken: "" #token
- cos: #tencent cos
- bucketURL: "https://temp-1252357374.cos.ap-chengdu.myqcloud.com"
- secretID: ""
- secretKey: ""
- sessionToken: ""
- oss: #ali oss
- endpoint: "https://oss-cn-chengdu.aliyuncs.com"
- bucket: "demo-9999999"
- bucketURL: "https://demo-9999999.oss-cn-chengdu.aliyuncs.com"
- accessKeyID: ""
- accessKeySecret: ""
- sessionToken: ""
+ tempBucket: "openim" #不建议修改
+ dataBucket: "openim" #不建议修改
+ location: us-east-1 #不建议修改
+ endpoint: http://127.0.0.1:10005 #minio对外服务的ip和端口,app要能访问此ip和端口
+ accessKeyID: root #ID
+ secretAccessKey: openIM123 #秘钥
+ isDistributedMod: false #是否分布式多硬盘部署,如果是多硬盘部署,需要修改为true
+ tencent: #tencent cos
+ appID:
+ region:
+ bucket:
+ secretID:
+ secretKey:
+ ali: #ali oss
+ regionID:
+ accessKeyID:
+ accessKeySecret:
+ stsEndpoint:
+ ossEndpoint:
+ bucket:
+ finalHost:
+ stsDurationSeconds:
+ OssRoleArn:
+ aws:
+ accessKeyID:
+ accessKeySecret:
+ region:
+ bucket:
+ finalHost:
+ roleArn:
+ externalId:
+ roleSessionName:
rpcPort: #rpc服务端口,不建议修改,端口由脚本读取后传入程序,如启动多个程序,只需要填入多个端口,用逗号隔开,如 [10110, 10111]
openImUserPort: [ 10110 ]
@@ -164,8 +182,7 @@ groupMessageHasReadReceiptEnable: true #群聊已读是否开
singleMessageHasReadReceiptEnable: true #单聊已读是否开启
retainChatRecords: 365 #mongo保存离线消息时间(天)
-chatRecordsClearTime: "0 2 * * 3" #每周三凌晨2点清理mongo中的过期(超过retainChatRecords时间)消息,这个删除是为了清理满足上个配置retainChatRecords的过期消息,不会发送通知,仅仅作为清理磁盘使用
-msgDestructTime: "0 2 * * *" #消息自动删除时间,每天凌晨2点删除过期消息,这个删除是为了删除保留时间超过超过会话字段msg_destruct_time(秒)的消息。
+chatRecordsClearTime: "0 2 * * 3" #每周三凌晨2点清理mongo中的过期(超过retainChatRecords时间)消息
secret: tuoyun #秘钥,获取token时校验
diff --git a/pkg/common/db/cache/init_redis_test.go b/pkg/common/db/cache/init_redis_test.go
index 6f78a43bd..7bf1a4a7d 100644
--- a/pkg/common/db/cache/init_redis_test.go
+++ b/pkg/common/db/cache/init_redis_test.go
@@ -27,4 +27,10 @@ func TestNewRedis(t *testing.T) {
fmt.Println("config load error")
return
}
+ redis, err := NewRedis()
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ fmt.Println(redis)
}
diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go
index a66ef3dba..a749c76f8 100644
--- a/pkg/common/kafka/producer.go
+++ b/pkg/common/kafka/producer.go
@@ -76,6 +76,9 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
//}
time.Sleep(time.Duration(1) * time.Second)
}
+ if err != nil {
+ panic(err.Error())
+ }
p.producer = producer
return &p
}
From 772cb859fab14b5c9804f4c50c257e299f4932ba Mon Sep 17 00:00:00 2001
From: plutoyty <2631223275@qq.com>
Date: Thu, 13 Jul 2023 17:18:14 +0800
Subject: [PATCH 8/8] url to uri
---
pkg/common/db/unrelation/mongo.go | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go
index b2a99891e..9ce9929c5 100644
--- a/pkg/common/db/unrelation/mongo.go
+++ b/pkg/common/db/unrelation/mongo.go
@@ -42,9 +42,9 @@ type Mongo struct {
// NewMongo Initialize MongoDB connection
func NewMongo() (*Mongo, error) {
specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound)
- url := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority"
+ uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority"
if config.Config.Mongo.Uri != "" {
- url = config.Config.Mongo.Uri
+ uri = config.Config.Mongo.Uri
} else {
mongodbHosts := ""
for i, v := range config.Config.Mongo.Address {
@@ -55,22 +55,22 @@ func NewMongo() (*Mongo, error) {
}
}
if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" {
- url = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
+ uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts,
config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize)
} else {
- url = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin",
+ uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin",
mongodbHosts, config.Config.Mongo.Database,
config.Config.Mongo.MaxPoolSize)
}
}
- fmt.Println("mongo:", url)
+ fmt.Println("mongo:", uri)
var mongoClient *mongo.Client
var err error = nil
for i := 0; i <= maxRetry; i++ {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
- mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(url))
+ mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(uri))
if err == nil {
return &Mongo{db: mongoClient}, nil
}