pull/454/head
withchao 2 years ago
parent af6512539d
commit 6efbd92366

@ -10,11 +10,11 @@ zookeeper:
mysql: mysql:
address: [ 127.0.0.1:13306 ] #mysql地址 目前仅支持单机,默认即可 address: [ 127.0.0.1:13306 ] #mysql地址 目前仅支持单机,默认即可
userName: root #mysql用户名建议修改 username: root #mysql用户名建议修改
password: openIM123 # mysql密码建议修改 password: openIM123 # mysql密码建议修改
databaseName: openIM_v2 #默认即可 database: openIM_v2 #默认即可
maxOpenConns: 100 # maxOpenConn: 100 #
maxIdleConns: 10 # maxIdleConn: 10 #
maxLifeTime: 5 # maxLifeTime: 5 #
logLevel: 4 # 1=slient 2=error 3=warn 4=info logLevel: 4 # 1=slient 2=error 3=warn 4=info
slowThreshold: 500 # 毫秒 slowThreshold: 500 # 毫秒
@ -25,7 +25,7 @@ mongo:
timeout: 60 timeout: 60
database: openIM #mongo db 默认即可 database: openIM #mongo db 默认即可
source: admin source: admin
userName: root #mongo用户名建议先不设置 username: root #mongo用户名建议先不设置
password: openIM123 #mongo密码建议先不设置 password: openIM123 #mongo密码建议先不设置
maxPoolSize: 100 maxPoolSize: 100
retainChatRecords: 3650 #mongo保存离线消息时间根据需求修改 retainChatRecords: 3650 #mongo保存离线消息时间根据需求修改
@ -33,12 +33,12 @@ mongo:
redis: redis:
address: [ 127.0.0.1:16379 ] #redis地址 单机时填写一个地址即可使用redis集群时候填写集群中多个节点地址主从地址都可以填写增加容灾能力默认即可 address: [ 127.0.0.1:16379 ] #redis地址 单机时填写一个地址即可使用redis集群时候填写集群中多个节点地址主从地址都可以填写增加容灾能力默认即可
userName: #only redis version 6.0+ need username username: #only redis version 6.0+ need username
passWord: openIM123 #redis密码 建议修改 password: openIM123 #redis密码 建议修改
kafka: kafka:
SASLUserName: username:
SASLPassword: password:
addr: [ 127.0.0.1:9092 ] #kafka配置默认即可 addr: [ 127.0.0.1:9092 ] #kafka配置默认即可
latestMsgToRedis: latestMsgToRedis:
topic: "latestMsgToRedis" # topic: "latestMsgToRedis" #
@ -70,13 +70,6 @@ api:
openImApiPort: [ 10002 ] #api服务端口 openImApiPort: [ 10002 ] #api服务端口
listenIP: listenIP:
sdk:
openImSdkWsPort: [ 10003 ] #jssdk服务端口
dataDir: [ ../db/sdk/ ]
openImWsAddress: ws://127.0.0.1:10001
openImApiAddress: http://127.0.0.1:10002
object: object:
enable: minio enable: minio
apiURL: http://127.0.0.1:10002/third/object apiURL: http://127.0.0.1:10002/third/object
@ -114,7 +107,7 @@ object:
externalId: #角色扩展Id externalId: #角色扩展Id
roleSessionName: #角色SESSION名称 roleSessionName: #角色SESSION名称
rpcPort: #rpc服务端口 默认即可 rpcPort: #rpc服务端口 默认即可
openImUserPort: [ 10110 ] openImUserPort: [ 10110 ]
openImFriendPort: [ 10120 ] openImFriendPort: [ 10120 ]
openImMessagePort: [ 10130 ] openImMessagePort: [ 10130 ]
@ -126,7 +119,7 @@ rpcPort: #rpc服务端口 默认即可
openImRtcPort: [ 10190 ] openImRtcPort: [ 10190 ]
openImThirdPort: [ 10200 ] openImThirdPort: [ 10200 ]
rpcRegisterName: #rpc注册服务名默认即可 rpcRegisterName: #rpc注册服务名默认即可
openImUserName: User openImUserName: User
openImFriendName: Friend openImFriendName: Friend
openImMsgName: Msg openImMsgName: Msg
@ -156,15 +149,21 @@ longConnSvr:
push: push:
enable: getui enable: getui
geTui: #个推推送 geTui: #个推推送
pushUrl: "https://restapi.getui.com/v2/$appId" pushUrl: "https://restapi.getui.com/v2/$appId"
masterSecret: "" masterSecret: ""
appKey: "" appKey: ""
intent: "" intent: ""
channelID: "" channelID: ""
channelName: "" channelName: ""
fcm: #firebase cloud message 消息推送 fcm: #firebase cloud message 消息推送
serviceAccount: "openim-5c6c0-firebase-adminsdk-ppwol-8765884a78.json" #帐号文件,此处需要改修配置,并且这个文件放在 config目录下 serviceAccount: "openim-5c6c0-firebase-adminsdk-ppwol-8765884a78.json" #帐号文件,此处需要改修配置,并且这个文件放在 config目录下
jpns: #极光推送 在极光后台申请后,修改以下四项,必须修改
appKey:
masterSecret:
pushUrl:
pushIntent:
enable: false
manager: manager:
#app管理员userID和nickename 用于管理后台登录 #app管理员userID和nickename 用于管理后台登录

@ -23,7 +23,7 @@ func NewConsumerHandler(pusher *Pusher) *ConsumerHandler {
var consumerHandler ConsumerHandler var consumerHandler ConsumerHandler
consumerHandler.pusher = pusher consumerHandler.pusher = pusher
consumerHandler.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, consumerHandler.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ms2pschat.Topic}, config.Config.Kafka.Addr, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToPush.Topic}, config.Config.Kafka.Addr,
config.Config.Kafka.ConsumerGroupID.MsgToPush) config.Config.Kafka.ConsumerGroupID.MsgToPush)
return &consumerHandler return &consumerHandler
} }

@ -57,13 +57,12 @@ func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offl
func NewOfflinePusher(cache cache.MsgModel) offlinepush.OfflinePusher { func NewOfflinePusher(cache cache.MsgModel) offlinepush.OfflinePusher {
var offlinePusher offlinepush.OfflinePusher var offlinePusher offlinepush.OfflinePusher
if config.Config.Push.GeTui.Enable { switch config.Config.Push.Enable {
case "getui":
offlinePusher = getui.NewClient(cache) offlinePusher = getui.NewClient(cache)
} case "fcm":
if config.Config.Push.Fcm.Enable {
offlinePusher = fcm.NewClient(cache) offlinePusher = fcm.NewClient(cache)
} case "jpush":
if config.Config.Push.Jpns.Enable {
offlinePusher = jpush.NewClient() offlinePusher = jpush.NewClient()
} }
return offlinePusher return offlinePusher

@ -74,8 +74,8 @@ func (c *MsgTool) AllConversationClearMsgAndFixSeq() {
func (c *MsgTool) ClearConversationsMsg(ctx context.Context, conversationIDs []string) { func (c *MsgTool) ClearConversationsMsg(ctx context.Context, conversationIDs []string) {
for _, conversationID := range conversationIDs { for _, conversationID := range conversationIDs {
if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil { if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, int64(config.Config.Mongo.RetainChatRecords*24*60*60)); err != nil {
log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "conversationID", conversationID, "DBRetainChatRecords", config.Config.Mongo.DBRetainChatRecords) log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "conversationID", conversationID, "DBRetainChatRecords", config.Config.Mongo.RetainChatRecords)
} }
if err := c.checkMaxSeq(ctx, conversationID); err != nil { if err := c.checkMaxSeq(ctx, conversationID); err != nil {
log.ZError(ctx, "fixSeq failed", err, "conversationID", conversationID) log.ZError(ctx, "fixSeq failed", err, "conversationID", conversationID)

@ -2,37 +2,15 @@ package config
import ( import (
_ "embed" _ "embed"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"gopkg.in/yaml.v3"
) )
//go:embed version //go:embed version
var Version string var Version string
var ( var Config struct {
_, b, _, _ = runtime.Caller(0) config
// Root folder of this project Notification notification
Root = filepath.Join(filepath.Dir(b), "../../..") }
)
const (
FileName = "config.yaml"
NotificationFileName = "notification.yaml"
ENV = "CONFIG_NAME"
DefaultFolderPath = "../config/"
ConfKey = "conf"
)
var Config config
type CallBackConfig struct { type CallBackConfig struct {
Enable bool `yaml:"enable"` Enable bool `yaml:"enable"`
@ -63,49 +41,49 @@ type config struct {
} `yaml:"zookeeper"` } `yaml:"zookeeper"`
Mysql struct { Mysql struct {
DBAddress []string `yaml:"address"` Address []string `yaml:"address"`
DBUserName string `yaml:"userName"` Username string `yaml:"username"`
DBPassword string `yaml:"password"` Password string `yaml:"password"`
DBDatabaseName string `yaml:"databaseName"` Database string `yaml:"database"`
DBMaxOpenConns int `yaml:"maxOpenConns"` MaxOpenConn int `yaml:"maxOpenConn"`
DBMaxIdleConns int `yaml:"maxIdleConns"` MaxIdleConn int `yaml:"maxIdleConn"`
DBMaxLifeTime int `yaml:"maxLifeTime"` MaxLifeTime int `yaml:"maxLifeTime"`
LogLevel int `yaml:"logLevel"` LogLevel int `yaml:"logLevel"`
SlowThreshold int `yaml:"slowThreshold"` SlowThreshold int `yaml:"slowThreshold"`
} `yaml:"mysql"` } `yaml:"mysql"`
Mongo struct { Mongo struct {
DBUri string `yaml:"uri"` Uri string `yaml:"uri"`
DBAddress []string `yaml:"address"` Address []string `yaml:"address"`
DBTimeout int `yaml:"timeout"` Timeout int `yaml:"timeout"`
DBDatabase string `yaml:"database"` Database string `yaml:"database"`
DBSource string `yaml:"source"` Source string `yaml:"source"`
DBUserName string `yaml:"userName"` Username string `yaml:"username"`
DBPassword string `yaml:"password"` Password string `yaml:"password"`
DBMaxPoolSize int `yaml:"maxPoolSize"` MaxPoolSize int `yaml:"maxPoolSize"`
DBRetainChatRecords int `yaml:"retainChatRecords"` RetainChatRecords int `yaml:"retainChatRecords"`
ChatRecordsClearTime string `yaml:"chatRecordsClearTime"` ChatRecordsClearTime string `yaml:"chatRecordsClearTime"`
} `yaml:"mongo"` } `yaml:"mongo"`
Redis struct { Redis struct {
DBAddress []string `yaml:"address"` Address []string `yaml:"address"`
DBUserName string `yaml:"userName"` Username string `yaml:"username"`
DBPassWord string `yaml:"passWord"` Password string `yaml:"passWord"`
} `yaml:"redis"` } `yaml:"redis"`
Kafka struct { Kafka struct {
SASLUserName string `yaml:"SASLUserName"` Username string `yaml:"username"`
SASLPassword string `yaml:"SASLPassword"` Password string `yaml:"password"`
Addr []string `yaml:"addr"` Addr []string `yaml:"addr"`
LatestMsgToRedis struct { LatestMsgToRedis struct {
Topic string `yaml:"topic"` Topic string `yaml:"topic"`
} `yaml:"latestMsgToRedis"` } `yaml:"latestMsgToRedis"`
OfflineMsgToMongoMysql struct { MsgToMongo struct {
Topic string `yaml:"topic"` Topic string `yaml:"topic"`
} `yaml:"offlineMsgToMongoMysql"` } `yaml:"offlineMsgToMongoMysql"`
MsqToPush struct { MsgToPush struct {
Topic string `yaml:"topic"` Topic string `yaml:"topic"`
} `yaml:"msqToPush"` } `yaml:"msgToPush"`
MsgToModify struct { MsgToModify struct {
Topic string `yaml:"topic"` Topic string `yaml:"topic"`
} `yaml:"msgToModify"` } `yaml:"msgToModify"`
@ -127,10 +105,6 @@ type config struct {
ListenIP string `yaml:"listenIP"` ListenIP string `yaml:"listenIP"`
} `yaml:"api"` } `yaml:"api"`
Sdk struct {
DataDir []string `yaml:"dataDir"`
} `yaml:"sdk"`
Object struct { Object struct {
Enable string `yaml:"enable"` Enable string `yaml:"enable"`
ApiURL string `yaml:"apiURL"` ApiURL string `yaml:"apiURL"`
@ -174,7 +148,19 @@ type config struct {
} `yaml:"object"` } `yaml:"object"`
RpcPort struct { RpcPort struct {
OpenImUserPort []int `yaml:"openImUserPort"`
OpenImFriendPort []int `yaml:"openImFriendPort"`
OpenImMessagePort []int `yaml:"openImMessagePort"`
OpenImMessageGatewayPort []int `yaml:"openImMessageGatewayPort"`
OpenImGroupPort []int `yaml:"openImGroupPort"`
OpenImAuthPort []int `yaml:"openImAuthPort"`
OpenImPushPort []int `yaml:"openImPushPort"`
OpenImConversationPort []int `yaml:"openImConversationPort"`
OpenImCachePort []int `yaml:"openImCachePort"`
OpenImRtcPort []int `yaml:"openImRtcPort"`
OpenImThirdPort []int `yaml:"openImThirdPort"`
} `yaml:"rpcPort"` } `yaml:"rpcPort"`
RpcRegisterName struct { RpcRegisterName struct {
OpenImUserName string `yaml:"openImUserName"` OpenImUserName string `yaml:"openImUserName"`
OpenImFriendName string `yaml:"openImFriendName"` OpenImFriendName string `yaml:"openImFriendName"`
@ -199,9 +185,10 @@ type config struct {
} `yaml:"log"` } `yaml:"log"`
LongConnSvr struct { LongConnSvr struct {
WebsocketMaxConnNum int `yaml:"websocketMaxConnNum"` OpenImWsPort []int `yaml:"openImWsPort"`
WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"` WebsocketMaxConnNum int `yaml:"websocketMaxConnNum"`
WebsocketTimeOut int `yaml:"websocketTimeOut"` WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"`
WebsocketTimeOut int `yaml:"websocketTimeOut"`
} `yaml:"longConnSvr"` } `yaml:"longConnSvr"`
Push struct { Push struct {
@ -266,7 +253,6 @@ type config struct {
CallbackBeforeMemberJoinGroup CallBackConfig `yaml:"beforeMemberJoinGroup"` CallbackBeforeMemberJoinGroup CallBackConfig `yaml:"beforeMemberJoinGroup"`
CallbackBeforeSetGroupMemberInfo CallBackConfig `yaml:"beforeSetGroupMemberInfo"` CallbackBeforeSetGroupMemberInfo CallBackConfig `yaml:"beforeSetGroupMemberInfo"`
} `yaml:"callback"` } `yaml:"callback"`
Notification Notification `yaml:"notification"`
Prometheus struct { Prometheus struct {
Enable bool `yaml:"enable"` Enable bool `yaml:"enable"`
@ -284,7 +270,7 @@ type config struct {
} `yaml:"prometheus"` } `yaml:"prometheus"`
} }
type Notification struct { type notification struct {
GroupCreated NotificationConf `yaml:"groupCreated"` GroupCreated NotificationConf `yaml:"groupCreated"`
GroupInfoSet NotificationConf `yaml:"groupInfoSet"` GroupInfoSet NotificationConf `yaml:"groupInfoSet"`
JoinGroupApplication NotificationConf `yaml:"joinGroupApplication"` JoinGroupApplication NotificationConf `yaml:"joinGroupApplication"`
@ -321,75 +307,3 @@ type Notification struct {
ConversationChanged NotificationConf `yaml:"conversationChanged"` ConversationChanged NotificationConf `yaml:"conversationChanged"`
ConversationSetPrivate NotificationConf `yaml:"conversationSetPrivate"` ConversationSetPrivate NotificationConf `yaml:"conversationSetPrivate"`
} }
func GetOptionsByNotification(cfg NotificationConf) utils.Options {
opts := utils.NewOptions()
if cfg.UnreadCount {
opts = utils.WithOptions(opts, utils.WithUnreadCount(true))
}
if cfg.OfflinePush.Enable {
opts = utils.WithOptions(opts, utils.WithOfflinePush(true))
}
switch cfg.ReliabilityLevel {
case constant.UnreliableNotification:
case constant.ReliableNotificationNoMsg:
opts = utils.WithOptions(opts, utils.WithHistory(true), utils.WithPersistent())
}
opts = utils.WithOptions(opts, utils.WithSendMsg(cfg.IsSendMsg))
return opts
}
func (c *config) unmarshalConfig(config interface{}, configPath string) error {
bytes, err := ioutil.ReadFile(configPath)
if err != nil {
return err
}
if err = yaml.Unmarshal(bytes, config); err != nil {
return err
}
return nil
}
func (c *config) initConfig(config interface{}, configName, configFolderPath string) error {
if configFolderPath == "" {
configFolderPath = DefaultFolderPath
}
configPath := filepath.Join(configFolderPath, configName)
defer func() {
fmt.Println("use config", configPath)
}()
_, err := os.Stat(configPath)
if err != nil {
if !os.IsNotExist(err) {
return err
}
configPath = filepath.Join(Root, "config", configName)
} else {
Root = filepath.Dir(configPath)
}
return c.unmarshalConfig(config, configPath)
}
func (c *config) RegisterConf2Registry(registry discoveryregistry.SvcDiscoveryRegistry) error {
bytes, err := yaml.Marshal(Config)
if err != nil {
return err
}
return registry.RegisterConf2Registry(ConfKey, bytes)
}
func (c *config) GetConfFromRegistry(registry discoveryregistry.SvcDiscoveryRegistry) ([]byte, error) {
return registry.GetConfFromRegistry(ConfKey)
}
func InitConfig(configFolderPath string) error {
err := Config.initConfig(&Config, FileName, configFolderPath)
if err != nil {
return err
}
err = Config.initConfig(&Config.Notification, NotificationFileName, configFolderPath)
if err != nil {
return err
}
return nil
}

@ -0,0 +1,98 @@
package config
import (
"fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"gopkg.in/yaml.v3"
"os"
"path/filepath"
"runtime"
)
var (
_, b, _, _ = runtime.Caller(0)
// Root folder of this project
Root = filepath.Join(filepath.Dir(b), "../../..")
)
const (
FileName = "config.yaml"
NotificationFileName = "notification.yaml"
ENV = "CONFIG_NAME"
DefaultFolderPath = "../config/"
ConfKey = "conf"
)
func GetOptionsByNotification(cfg NotificationConf) utils.Options {
opts := utils.NewOptions()
if cfg.UnreadCount {
opts = utils.WithOptions(opts, utils.WithUnreadCount(true))
}
if cfg.OfflinePush.Enable {
opts = utils.WithOptions(opts, utils.WithOfflinePush(true))
}
switch cfg.ReliabilityLevel {
case constant.UnreliableNotification:
case constant.ReliableNotificationNoMsg:
opts = utils.WithOptions(opts, utils.WithHistory(true), utils.WithPersistent())
}
opts = utils.WithOptions(opts, utils.WithSendMsg(cfg.IsSendMsg))
return opts
}
func (c *config) unmarshalConfig(config interface{}, configPath string) error {
bytes, err := os.ReadFile(configPath)
if err != nil {
return err
}
if err = yaml.Unmarshal(bytes, config); err != nil {
return err
}
return nil
}
func (c *config) initConfig(config interface{}, configName, configFolderPath string) error {
if configFolderPath == "" {
configFolderPath = DefaultFolderPath
}
configPath := filepath.Join(configFolderPath, configName)
defer func() {
fmt.Println("use config", configPath)
}()
_, err := os.Stat(configPath)
if err != nil {
if !os.IsNotExist(err) {
return err
}
configPath = filepath.Join(Root, "config", configName)
} else {
Root = filepath.Dir(configPath)
}
return c.unmarshalConfig(config, configPath)
}
func (c *config) RegisterConf2Registry(registry discoveryregistry.SvcDiscoveryRegistry) error {
bytes, err := yaml.Marshal(Config)
if err != nil {
return err
}
return registry.RegisterConf2Registry(ConfKey, bytes)
}
func (c *config) GetConfFromRegistry(registry discoveryregistry.SvcDiscoveryRegistry) ([]byte, error) {
return registry.GetConfFromRegistry(ConfKey)
}
func InitConfig(configFolderPath string) error {
err := Config.initConfig(&Config.config, FileName, configFolderPath)
if err != nil {
return err
}
err = Config.initConfig(&Config.Notification, NotificationFileName, configFolderPath)
if err != nil {
return err
}
return nil
}

@ -14,20 +14,20 @@ import (
func NewRedis() (redis.UniversalClient, error) { func NewRedis() (redis.UniversalClient, error) {
specialerror.AddReplace(redis.Nil, errs.ErrRecordNotFound) specialerror.AddReplace(redis.Nil, errs.ErrRecordNotFound)
var rdb redis.UniversalClient var rdb redis.UniversalClient
if config.Config.Redis.EnableCluster { if len(config.Config.Redis.Address) > 0 {
rdb = redis.NewClusterClient(&redis.ClusterOptions{ rdb = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: config.Config.Redis.DBAddress, Addrs: config.Config.Redis.Address,
Username: config.Config.Redis.DBUserName, Username: config.Config.Redis.Username,
Password: config.Config.Redis.DBPassWord, // no password set Password: config.Config.Redis.Password, // no password set
PoolSize: 50, PoolSize: 50,
}) })
} else { } else {
rdb = redis.NewClient(&redis.Options{ rdb = redis.NewClient(&redis.Options{
Addr: config.Config.Redis.DBAddress[0], Addr: config.Config.Redis.Address[0],
Username: config.Config.Redis.DBUserName, Username: config.Config.Redis.Username,
Password: config.Config.Redis.DBPassWord, // no password set Password: config.Config.Redis.Password, // no password set
DB: 0, // use default DB DB: 0, // use default DB
PoolSize: 100, // 连接池大小 PoolSize: 100, // 连接池大小
}) })
} }
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)

@ -2,7 +2,7 @@ package controller
import ( import (
"fmt" "fmt"
"github.com/go-redis/redis" "github.com/redis/go-redis/v9"
"time" "time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
@ -100,7 +100,7 @@ func NewCommonMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheMo
cache: cacheModel, cache: cacheModel,
producer: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.LatestMsgToRedis.Topic), producer: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.LatestMsgToRedis.Topic),
producerToMongo: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToMongo.Topic), producerToMongo: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToMongo.Topic),
producerToPush: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.Ms2pschat.Topic), producerToPush: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToPush.Topic),
producerToModify: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToModify.Topic), producerToModify: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToModify.Topic),
} }
} }

@ -16,7 +16,7 @@ import (
func newMysqlGormDB() (*gorm.DB, error) { func newMysqlGormDB() (*gorm.DB, error) {
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local", dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local",
config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, config.Config.Mysql.DBAddress[0], "mysql") config.Config.Mysql.Username, config.Config.Mysql.Password, config.Config.Mysql.Address[0], "mysql")
db, err := gorm.Open(mysql.Open(dsn), nil) db, err := gorm.Open(mysql.Open(dsn), nil)
if err != nil { if err != nil {
time.Sleep(time.Duration(30) * time.Second) time.Sleep(time.Duration(30) * time.Second)
@ -30,13 +30,13 @@ func newMysqlGormDB() (*gorm.DB, error) {
return nil, err return nil, err
} }
defer sqlDB.Close() defer sqlDB.Close()
sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s default charset utf8mb4 COLLATE utf8mb4_unicode_ci;", config.Config.Mysql.DBDatabaseName) sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s default charset utf8mb4 COLLATE utf8mb4_unicode_ci;", config.Config.Mysql.Database)
err = db.Exec(sql).Error err = db.Exec(sql).Error
if err != nil { if err != nil {
return nil, fmt.Errorf("init db %w", err) return nil, fmt.Errorf("init db %w", err)
} }
dsn = fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local", dsn = fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local",
config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, config.Config.Mysql.DBAddress[0], config.Config.Mysql.DBDatabaseName) config.Config.Mysql.Username, config.Config.Mysql.Password, config.Config.Mysql.Address[0], config.Config.Mysql.Database)
sqlLogger := log.NewSqlLogger(logger.LogLevel(config.Config.Mysql.LogLevel), true, time.Duration(config.Config.Mysql.SlowThreshold)*time.Millisecond) sqlLogger := log.NewSqlLogger(logger.LogLevel(config.Config.Mysql.LogLevel), true, time.Duration(config.Config.Mysql.SlowThreshold)*time.Millisecond)
db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{ db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: sqlLogger, Logger: sqlLogger,
@ -48,9 +48,9 @@ func newMysqlGormDB() (*gorm.DB, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
sqlDB.SetConnMaxLifetime(time.Second * time.Duration(config.Config.Mysql.DBMaxLifeTime)) sqlDB.SetConnMaxLifetime(time.Second * time.Duration(config.Config.Mysql.MaxLifeTime))
sqlDB.SetMaxOpenConns(config.Config.Mysql.DBMaxOpenConns) sqlDB.SetMaxOpenConns(config.Config.Mysql.MaxOpenConn)
sqlDB.SetMaxIdleConns(config.Config.Mysql.DBMaxIdleConns) sqlDB.SetMaxIdleConns(config.Config.Mysql.MaxIdleConn)
return db, nil return db, nil
} }

@ -23,27 +23,27 @@ type Mongo struct {
func NewMongo() (*Mongo, error) { func NewMongo() (*Mongo, error) {
specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound) specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound)
uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority" uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority"
if config.Config.Mongo.DBUri != "" { if config.Config.Mongo.Uri != "" {
// example: mongodb://$user:$password@mongo1.mongo:27017,mongo2.mongo:27017,mongo3.mongo:27017/$DBDatabase/?replicaSet=rs0&readPreference=secondary&authSource=admin&maxPoolSize=$DBMaxPoolSize // example: mongodb://$user:$password@mongo1.mongo:27017,mongo2.mongo:27017,mongo3.mongo:27017/$DBDatabase/?replicaSet=rs0&readPreference=secondary&authSource=admin&maxPoolSize=$DBMaxPoolSize
uri = config.Config.Mongo.DBUri uri = config.Config.Mongo.Uri
} else { } else {
//mongodb://mongodb1.example.com:27317,mongodb2.example.com:27017/?replicaSet=mySet&authSource=authDB //mongodb://mongodb1.example.com:27317,mongodb2.example.com:27017/?replicaSet=mySet&authSource=authDB
mongodbHosts := "" mongodbHosts := ""
for i, v := range config.Config.Mongo.DBAddress { for i, v := range config.Config.Mongo.Address {
if i == len(config.Config.Mongo.DBAddress)-1 { if i == len(config.Config.Mongo.Address)-1 {
mongodbHosts += v mongodbHosts += v
} else { } else {
mongodbHosts += v + "," mongodbHosts += v + ","
} }
} }
if config.Config.Mongo.DBPassword != "" && config.Config.Mongo.DBUserName != "" { if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" {
uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin", uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
config.Config.Mongo.DBUserName, config.Config.Mongo.DBPassword, mongodbHosts, config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts,
config.Config.Mongo.DBDatabase, config.Config.Mongo.DBMaxPoolSize) config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize)
} else { } else {
uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin", uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin",
mongodbHosts, config.Config.Mongo.DBDatabase, mongodbHosts, config.Config.Mongo.Database,
config.Config.Mongo.DBMaxPoolSize) config.Config.Mongo.MaxPoolSize)
} }
} }
fmt.Println("mongo:", uri) fmt.Println("mongo:", uri)
@ -61,7 +61,7 @@ func (m *Mongo) GetClient() *mongo.Client {
} }
func (m *Mongo) GetDatabase() *mongo.Database { func (m *Mongo) GetDatabase() *mongo.Database {
return m.db.Database(config.Config.Mongo.DBDatabase) return m.db.Database(config.Config.Mongo.Database)
} }
func (m *Mongo) CreateMsgIndex() error { func (m *Mongo) CreateMsgIndex() error {
@ -83,7 +83,7 @@ func (m *Mongo) CreateExtendMsgSetIndex() error {
} }
func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...string) error { func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...string) error {
db := m.db.Database(config.Config.Mongo.DBDatabase).Collection(collection) db := m.db.Database(config.Config.Mongo.Database).Collection(collection)
opts := options.CreateIndexes().SetMaxTime(10 * time.Second) opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
indexView := db.Indexes() indexView := db.Indexes()
keysDoc := bsonx.Doc{} keysDoc := bsonx.Doc{}

@ -21,10 +21,10 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer {
p.Topic = topic p.Topic = topic
p.addr = addr p.addr = addr
consumerConfig := sarama.NewConfig() consumerConfig := sarama.NewConfig()
if config.Config.Kafka.SASLUserName != "" && config.Config.Kafka.SASLPassword != "" { if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" {
consumerConfig.Net.SASL.Enable = true consumerConfig.Net.SASL.Enable = true
consumerConfig.Net.SASL.User = config.Config.Kafka.SASLUserName consumerConfig.Net.SASL.User = config.Config.Kafka.Username
consumerConfig.Net.SASL.Password = config.Config.Kafka.SASLPassword consumerConfig.Net.SASL.Password = config.Config.Kafka.Password
} }
consumer, err := sarama.NewConsumer(p.addr, consumerConfig) consumer, err := sarama.NewConsumer(p.addr, consumerConfig)
if err != nil { if err != nil {

@ -32,10 +32,10 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
p.config.Producer.Return.Errors = true p.config.Producer.Return.Errors = true
p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all
p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly
if config.Config.Kafka.SASLUserName != "" && config.Config.Kafka.SASLPassword != "" { if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" {
p.config.Net.SASL.Enable = true p.config.Net.SASL.Enable = true
p.config.Net.SASL.User = config.Config.Kafka.SASLUserName p.config.Net.SASL.User = config.Config.Kafka.Username
p.config.Net.SASL.Password = config.Config.Kafka.SASLPassword p.config.Net.SASL.Password = config.Config.Kafka.Password
} }
p.addr = addr p.addr = addr
p.topic = topic p.topic = topic

@ -62,7 +62,7 @@ func GetClaimFromToken(tokensString string) (*Claims, error) {
func CheckAccessV3(ctx context.Context, ownerUserID string) (err error) { func CheckAccessV3(ctx context.Context, ownerUserID string) (err error) {
opUserID := mcontext.GetOpUserID(ctx) opUserID := mcontext.GetOpUserID(ctx)
if utils.IsContain(opUserID, config.Config.Manager.AppManagerUid) { if utils.IsContain(opUserID, config.Config.Manager.AppManagerUserID) {
return nil return nil
} }
if opUserID == ownerUserID { if opUserID == ownerUserID {
@ -72,11 +72,11 @@ func CheckAccessV3(ctx context.Context, ownerUserID string) (err error) {
} }
func IsAppManagerUid(ctx context.Context) bool { func IsAppManagerUid(ctx context.Context) bool {
return utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.AppManagerUid) return utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.AppManagerUserID)
} }
func CheckAdmin(ctx context.Context) error { func CheckAdmin(ctx context.Context) error {
if utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.AppManagerUid) { if utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.AppManagerUserID) {
return nil return nil
} }
return errs.ErrIdentity.Wrap(fmt.Sprintf("user %s is not admin userID", mcontext.GetOpUserID(ctx))) return errs.ErrIdentity.Wrap(fmt.Sprintf("user %s is not admin userID", mcontext.GetOpUserID(ctx)))
@ -86,7 +86,7 @@ func ParseRedisInterfaceToken(redisToken interface{}) (*Claims, error) {
return GetClaimFromToken(string(redisToken.([]uint8))) return GetClaimFromToken(string(redisToken.([]uint8)))
} }
func IsManagerUserID(opUserID string) bool { func IsManagerUserID(opUserID string) bool {
return utils.IsContain(opUserID, config.Config.Manager.AppManagerUid) return utils.IsContain(opUserID, config.Config.Manager.AppManagerUserID)
} }
func WsVerifyToken(token, userID string, platformID int) error { func WsVerifyToken(token, userID string, platformID int) error {
claim, err := GetClaimFromToken(token) claim, err := GetClaimFromToken(token)

Loading…
Cancel
Save