Merge branch 'v3' into ver3

# Conflicts:
#	pkg/common/config/config.go
#	pkg/startrpc/start.go
pull/458/head
withchao 2 years ago
commit ef8b4953f8

@ -1,7 +1,6 @@
package main
import (
"bytes"
"context"
"fmt"
"net"
@ -10,8 +9,6 @@ import (
"strconv"
"time"
"gopkg.in/yaml.v3"
"net/http"
_ "net/http/pprof"
@ -55,7 +52,7 @@ func run(port int) error {
fmt.Println("api start init discov client")
var client discoveryregistry.SvcDiscoveryRegistry
client, err = openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
openKeeper.WithFreq(time.Hour), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.UserName,
openKeeper.WithFreq(time.Hour), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username,
config.Config.Zookeeper.Password), openKeeper.WithRoundRobin(), openKeeper.WithTimeout(10), openKeeper.WithLogger(log.NewZkLogger()))
if err != nil {
return err
@ -64,12 +61,8 @@ func run(port int) error {
return err
}
fmt.Println("api init discov client success")
buf := bytes.NewBuffer(nil)
if err := yaml.NewEncoder(buf).Encode(config.Config); err != nil {
return err
}
fmt.Println("api register public config to discov")
if err := client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, buf.Bytes()); err != nil {
if err := client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.EncodeConfig()); err != nil {
return err
}
fmt.Println("api register public config to discov success")

@ -1,117 +1,84 @@
# The class cannot be named by Pascal or camel case.
# If it is not used, the corresponding structure will not be set,
# and it will not be read naturally.
serverversion: 2.3.7
#OpenIM config
#---------------Infrastructure configuration---------------------#
zookeeper:
schema: openim #默认即可
zkAddr: [ 127.0.0.1:2181 ] #单机部署时,默认即可
userName:
password:
schema: openim #不建议修改
address: [ 127.0.0.1:2181 ] #
username: #用户名
password: #密码
mysql:
dbMysqlAddress: [ 127.0.0.1:13306 ] #mysql地址 目前仅支持单机,默认即可
dbMysqlUserName: root #mysql用户名建议修改
dbMysqlPassword: openIM123 # mysql密码建议修改
dbMysqlDatabaseName: openIM_v2 #默认即可
dbTableName: eMsg #默认即可
dbMsgTableNum: 1
dbMaxOpenConns: 100
dbMaxIdleConns: 10
dbMaxLifeTime: 5
logLevel: 4 #1=slient 2=error 3=warn 4=info
slowThreshold: 500
address: [ 127.0.0.1:13306 ] #目前仅支持单机
username: root #用户名
password: openIM123 #密码
database: openIM_v2 #不建议修改
maxOpenConn: 1000 #最大连接数
maxIdleConn: 100 #最大空闲连接数
maxLifeTime: 60 #连接可以重复使用的最长时间(秒)
logLevel: 4 #日志级别 1=slient 2=error 3=warn 4=info
slowThreshold: 500 #慢语句阈值 (毫秒)
mongo:
dbUri: ""#当dbUri值不为空则直接使用该值
dbAddress: [ 127.0.0.1:37017 ] #单机时为mongo地址使用分片集群时为mongos地址 默认即可
dbDirect: false
dbTimeout: 60
dbDatabase: openIM #mongo db 默认即可
dbSource: admin
dbUserName: root #mongo用户名建议先不设置
dbPassword: openIM123 #mongo密码建议先不设置
dbMaxPoolSize: 100
dbRetainChatRecords: 3650 #mongo保存离线消息时间根据需求修改
chatRecordsClearTime: "0 2 * * 3" # 每周三凌晨2点清除消息该配置和linux定时任务一样 清理操作建议设置在用户活跃少的时候 # 0 3 * * *
uri: #不为空则直接使用该值
address: [ 127.0.0.1:37017 ] #单机时为mongo地址使用分片集群时为mongos地址
database: openIM #mongo db 默认即可
username: root #用户名
password: openIM123 #密码
maxPoolSize: 100
redis:
dbAddress: [ 127.0.0.1:16379 ] #redis地址 单机时填写一个地址即可使用redis集群时候填写集群中多个节点地址主从地址都可以填写增加容灾能力默认即可
dbMaxIdle: 128
dbMaxActive: 0
dbIdleTimeout: 120
dbUserName: #only redis version 6.0+ need username
dbPassWord: openIM123 #redis密码 建议修改
enableCluster: false #如果外部redis以集群方式启动需要打开此开关
address: [ 127.0.0.1:16379 ] #
username: #only redis version 6.0+ need username
password: openIM123 #密码
kafka:
SASLUserName:
SASLPassword:
ws2mschat:
addr: [ 127.0.0.1:9092 ] #kafka配置默认即可
topic: "ws2ms_chat" #用于mongo和mysql保存消息
msgtomongo:
addr: [ 127.0.0.1:9092 ] #kafka配置默认即可
topic: "msg_to_mongo"
ms2pschat:
addr: [ 127.0.0.1:9092 ] #kafka配置默认即可
topic: "ms2ps_chat" #消息push
msgtomodify:
addr: [ 127.0.0.1:9092 ] #kafka配置默认即可
topic: "msg_to_modify"
consumergroupid:
msgToTransfer: mongo
msgToMongo: mongo_ex
msgToMySql: mysql
msgToPush: push
msgToModify: modify
#---------------Internal service configuration---------------------#
# The service ip default is empty,
# automatically obtain the machine's valid network card ip as the service ip,
# otherwise the configuration ip is preferred
#如果是单机模式用0.0.0.0或者不填,默认即可
#作为rpc时注册到etcd的地址单机默认即可如果是集群部署需要修改具体使用内网地址还是外网地址要依情况而定目的是api/gateway能访问到
rpcRegisterIP:
#默认即可
listenIP: 0.0.0.0
username: #用户名
password: #密码
addr: [ 127.0.0.1:9092 ] #
latestMsgToRedis:
topic: "latestMsgToRedis" #不建议修改
offlineMsgToMongo:
topic: "offlineMsgToMongoMysql" #不建议修改
msgToPush:
topic: "msqToPush" #不建议修改
msgToModify:
topic: "msgToModify" #不建议修改
consumerGroupID: #消费者组,不建议修改
msgToRedis: redis #
msgToMongo: mongo #
msgToMySql: mysql #
msgToPush: push #
msgToModify: modify #
api:
openImApiPort: [ 10002 ] #api服务端口默认即可需要开放此端口或做nginx转发
listenIP: 0.0.0.0
rpc:
registerIP: #作为rpc启动时注册到zookeeper的IPapi/gateway能访问到此ip和对应的rpcPort中的端口
listenIP: #默认为0.0.0.0
sdk:
openImSdkWsPort: [ 10003 ] #jssdk服务端口默认即可项目中使用jssdk才需开放此端口或做nginx转发
dataDir: [ ../db/sdk/ ]
openImWsAddress: ws://127.0.0.1:10001
openImApiAddress: http://127.0.0.1:10002
#对象存储服务以下配置二选一目前支持两种腾讯云和minio二者配置好其中一种即可如果使用minio参考https://doc.rentsoft.cn/#/qa/minio搭建minio服务器
credential: #腾讯cos发送图片、视频、文件时需要请自行申请后替换必须修改
api:
openImApiPort: [ 10002 ] #api服务端口
listenIP: #默认为0.0.0.0
object:
enable: minio
enable: minio #使用minio
apiURL: http://127.0.0.1:10002/third/object
minio: #MinIO 发送图片、视频、文件时需要,请自行申请后替换,必须修改。 客户端初始化InitSDK中 object_storage参数为minio
tempBucket: "openim"
dataBucket: "openim"
location: us-east-1
endpoint: http://127.0.0.1:10005 #minio外网ip 这个ip是给客户端访问的
accessKeyID: root
secretAccessKey: openIM123
isDistributedMod: false # 是否分布式多硬盘部署 默认docker-compose中为false如果是多硬盘部署需要修改为true
tencent:
minio:
tempBucket: "openim" #不建议修改
dataBucket: "openim" #不建议修改
location: us-east-1 #不建议修改
endpoint: http://127.0.0.1:10005 #minio对外服务的ip和端口app要能访问此ip和端口
accessKeyID: root #ID
secretAccessKey: openIM123 #秘钥
isDistributedMod: false #是否分布式多硬盘部署如果是多硬盘部署需要修改为true
tencent: #tencent cos
appID:
region:
bucket:
secretID:
secretKey:
ali: # ali oss
ali: #ali oss
regionID:
accessKeyID:
accessKeySecret:
@ -122,16 +89,16 @@ object:
stsDurationSeconds:
OssRoleArn:
aws:
accessKeyID: #AssumeRole用户关联的accessKeyID
accessKeySecret: #AssumeRole用户关联的accessKeySecrect
region: #分区
bucket: #桶
finalHost: #对外Host
roleArn: #RoleArn
externalId: #角色扩展Id
roleSessionName: #角色SESSION名称
rpcport: #rpc服务端口 默认即可
accessKeyID:
accessKeySecret:
region:
bucket:
finalHost:
roleArn:
externalId:
roleSessionName:
rpcPort: #rpc服务端口不建议修改端口由脚本读取后传入程序如启动多个程序只需要填入多个端口用逗号隔开如 openImUserPort: [10110, 10111]
openImUserPort: [ 10110 ]
openImFriendPort: [ 10120 ]
openImMessagePort: [ 10130 ]
@ -140,10 +107,9 @@ rpcport: #rpc服务端口 默认即可
openImAuthPort: [ 10160 ]
openImPushPort: [ 10170 ]
openImConversationPort: [ 10180 ]
openImRtcPort: [ 10190 ]
openImThirdPort: [ 10200 ]
openImThirdPort: [ 10190 ]
rpcregistername: #rpc注册服务名默认即可
rpcRegisterName: #rpc注册服务名不建议修改
openImUserName: User
openImFriendName: Friend
openImMsgName: Msg
@ -152,158 +118,138 @@ rpcregistername: #rpc注册服务名默认即可
openImGroupName: Group
openImAuthName: Auth
openImConversationName: Conversation
openImRtcName: Rtc
openImThirdName: Third
log:
storageLocation: ../logs/
rotationTime: 24
remainRotationCount: 2 #日志数量
#日志级别 6表示全都打印测试阶段建议设置为6
remainLogLevel: 6
storageLocation: ../logs/ #存放目录
rotationTime: 24 #日志旋转时间
remainRotationCount: 2 #日志数量
remainLogLevel: 6 #日志级别 6表示全都打印
isStdout: false
isJson: false
withStack: false
modulename: #日志文件按模块命名,默认即可
longConnSvrName: msg_gateway
msgTransferName: msg_transfer
pushName: push
longConnSvr:
openImWsPort: [ 10001 ] #msg_gateway的websocket端口
websocketMaxConnNum: 100000 #websocket最大连接数
websocketMaxMsgLen: 4096 #websocket请求包最大长度
websocketTimeout: 10 #websocket连接握手超时时间
longconnsvr:
openImWsPort: [ 10001 ] # ws服务端口默认即可要开放此端口或做nginx转发
websocketMaxConnNum: 10000
websocketMaxMsgLen: 4096
websocketTimeOut: 10
## 推送只能开启一个 enable代表开启
push:
tpns: #腾讯推送,暂未测试 暂不要使用
ios:
accessID: 1600018281
secretKey: 3cd68a77a95b89e5089a1aca523f318f
android:
accessID: 111
secretKey: 111
enable: false # true or false (bool)
jpns: #极光推送 在极光后台申请后,修改以下四项,必须修改
appKey:
masterSecret:
pushUrl:
pushIntent:
enable: false # true or false (bool)
getui: #个推推送
enable: getui
geTui: #个推离线推送
pushUrl: "https://restapi.getui.com/v2/$appId"
masterSecret: ""
appKey: ""
intent: ""
enable: false # true or false (bool)
channelID: ""
channelName: ""
fcm: #firebase cloud message 消息推送
serviceAccount: "openim-5c6c0-firebase-adminsdk-ppwol-8765884a78.json" #帐号文件,此处需要改修配置,并且这个文件放在 config目录下
enable: false
fcm: #fcm离线推送
serviceAccount: "x.json" #帐号文件,并放在 config目录下
jpns: #极光推送 在极光后台申请后,修改以下四项
appKey:
masterSecret:
pushUrl:
pushIntent:
manager:
#app管理员userID和对应的secret 建议修改。 用于管理后台登录也可以用户管理后台对应的api
appManagerUid: [ "openIM123456","openIM654321", "openIM333", "openIMAdmin" ]
nickname: [ "系统通知","openIM654321", "openIM333", "openIMAdmin" ]
secret: tuoyun
# 多端互踢策略
# 1多平台登录Android、iOS、Windows、Mac 每种平台只能一个在线web端可以多个同时在线
multiloginpolicy: 1
#msg log insert to db
chatpersistencemysql: true
#消息缓存时间
msgCacheTimeout: 86400
#群聊已读开启
groupMessageHasReadReceiptEnable: true
#单聊已读开启
singleMessageHasReadReceiptEnable: true
#token config
tokenpolicy:
accessSecret: "OpenIM_server" #token生成相关默认即可
# Token effective time day as a unit
accessExpire: 90 #token过期时间 默认即可
messageverify:
friendVerify: false
userID: [ "openIM123456","openIM654321","openIMAdmin" ] #内置的app管理员userID
nickname: [ "system1","system2", "system3" ] #内置的app管理员nickname
multiLoginPolicy: 1 #多平台登录Android、iOS、Windows、Mac、web 每种平台只能有一个在线
chatPersistenceMysql: true #消息是否存入mysqlmysql中的消息仅用于管理后台使用
msgCacheTimeout: 86400 #信消息缓存时间秒,不建议修改
groupMessageHasReadReceiptEnable: true #群聊已读是否开启
singleMessageHasReadReceiptEnable: true #单聊已读是否开启
retainChatRecords: 365 #mongo保存离线消息时间
chatRecordsClearTime: "0 2 * * 3" #每周三凌晨2点清理mongo中的过期超过retainChatRecords时间消息
tokenPolicy:
accessSecret: openIM123 #秘钥获取token时校验
accessExpire: 90 #过期时间(天)
messageVerify:
friendVerify: false #发送消息时是否验证好友关系
#ios系统推送声音以及标记计数
iospush:
iosPush:
pushSound: "xxx"
badgeCount: true
production: false
callback:
# callback url 需要自行更换callback url
callbackUrl: "http://127.0.0.1:10008/callback/open_im"
# 开启关闭操作前后回调的配置
callbackBeforeSendSingleMsg:
enable: false # 回调是否启用
callbackTimeOut: 2 # 回调超时时间
callbackFailedContinue: true # 回调超时是否继续执行代码
callbackAfterSendSingleMsg:
# 回调callback
url:
beforeSendSingleMsg:
enable: false #是否启用此回调事件
timeout: 5 #超时时间(秒)
failedContinue: true #如回调失败是否继续往后执行
afterSendSingleMsg:
enable: false
callbackTimeOut: 2
callbackBeforeSendGroupMsg:
timeout: 5
beforeSendGroupMsg:
enable: false
callbackTimeOut: 2
callbackFailedContinue: true
callbackAfterSendGroupMsg:
timeout: 5
failedContinue: true
afterSendGroupMsg:
enable: false
callbackTimeOut: 2
callbackMsgModify:
timeout: 5
msgModify:
enable: false
callbackTimeOut: 2
callbackFailedContinue: true
callbackUserOnline:
timeout: 5
failedContinue: true
userOnline:
enable: false
callbackTimeOut: 2
callbackUserOffline:
timeout: 5
userOffline:
enable: false
callbackTimeOut: 2
callbackUserKickOff:
timeout: 5
userKickOff:
enable: false
callbackTimeOut: 2
callbackOfflinePush:
timeout: 5
offlinePush:
enable: false
callbackTimeOut: 2
callbackFailedContinue: true # 回调超时是否继续离线推送
callbackOnlinePush:
timeout: 5
failedContinue: true
onlinePush:
enable: false
callbackTimeOut: 2
callbackFailedContinue: true # 回调超时是否继续离线推送
callbackSuperGroupOnlinePush:
timeout: 5
failedContinue: true
superGroupOnlinePush:
enable: false
callbackTimeOut: 2
callbackFailedContinue: true # 回调超时是否继续离线推送
callbackBeforeAddFriend:
timeout: 5
failedContinue: true
beforeAddFriend:
enable: false
callbackTimeOut: 2
callbackFailedContinue: true # 回调超时是否继续
callbackBeforeCreateGroup:
timeout: 5
failedContinue: true
beforeCreateGroup:
enable: false
callbackTimeOut: 2
callbackFailedContinue: true # 回调超时是否继续
callbackBeforeMemberJoinGroup:
timeout: 5
failedContinue: true
beforeMemberJoinGroup:
enable: false
callbackTimeOut: 2
callbackFailedContinue: true # 回调超时是否继续
callbackBeforeSetGroupMemberInfo:
timeout: 5
failedContinue: true
beforeSetGroupMemberInfo:
enable: false
callbackTimeOut: 2
callbackFailedContinue: true # 回调超时是否继续
callbackSetMessageReactionExtensions:
timeout: 5
failedContinue: true
setMessageReactionExtensions:
enable: false
callbackTimeOut: 2
callbackFailedContinue: true # 回调超时是否继续
timeout: 5
failedContinue: true
# prometheus每个服务监听的端口数量需要和rpc port保持一致
prometheus:
prometheus: #prometheus每个服务的端口数量需要和rpcPort保持对应
enable: false
userPrometheusPort: [ 20110 ]
friendPrometheusPort: [ 20120 ]
@ -314,5 +260,6 @@ prometheus:
pushPrometheusPort: [ 20170 ]
conversationPrometheusPort: [ 20230 ]
rtcPrometheusPort: [ 21300 ]
messageTransferPrometheusPort: [ 21400, 21401, 21402, 21403 ] # 端口数量和 script/path_info.cfg msg_transfer_service_num保持一致
thirdPrometheusPort: [ 21301 ]
messageTransferPrometheusPort: [ 21400, 21401, 21402, 21403 ] #端口数量需要和script/path_info.cfg中的msg_transfer_service_num保持一致

@ -12,7 +12,7 @@ func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error {
longServer, err := NewWsServer(
WithPort(wsPort),
WithMaxConnNum(int64(config.Config.LongConnSvr.WebsocketMaxConnNum)),
WithHandshakeTimeout(time.Duration(config.Config.LongConnSvr.WebsocketTimeOut)*time.Second),
WithHandshakeTimeout(time.Duration(config.Config.LongConnSvr.WebsocketTimeout)*time.Second),
WithMessageMaxMsgLength(config.Config.LongConnSvr.WebsocketMaxMsgLen))
if err != nil {
return err

@ -48,7 +48,7 @@ func StartTransfer(prometheusPort int) error {
return err
}
client, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
openKeeper.WithFreq(time.Hour), openKeeper.WithRoundRobin(), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.UserName,
openKeeper.WithFreq(time.Hour), openKeeper.WithRoundRobin(), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username,
config.Config.Zookeeper.Password), openKeeper.WithTimeout(10), openKeeper.WithLogger(log.NewZkLogger()))
if err != nil {
return err

@ -30,7 +30,7 @@ func NewModifyMsgConsumerHandler(database controller.ExtendMsgDatabase) *ModifyM
return &ModifyMsgConsumerHandler{
modifyMsgConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToModify.Topic},
config.Config.Kafka.MsgToModify.Addr, config.Config.Kafka.ConsumerGroupID.MsgToModify),
config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToModify),
extendMsgDatabase: database,
}
}

@ -75,8 +75,8 @@ func NewOnlineHistoryRedisConsumerHandler(database controller.CommonMsgDatabase,
och.conversationRpcClient = conversationRpcClient
och.groupRpcClient = groupRpcClient
och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis)
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.LatestMsgToRedis.Topic},
config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis)
//statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
return &och
}

@ -21,7 +21,7 @@ func NewOnlineHistoryMongoConsumerHandler(database controller.CommonMsgDatabase)
mc := &OnlineHistoryMongoConsumerHandler{
historyConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToMongo.Topic},
config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo),
config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo),
msgDatabase: database,
}
return mc

@ -29,8 +29,8 @@ type PersistentConsumerHandler struct {
func NewPersistentConsumerHandler(database controller.ChatLogDatabase) *PersistentConsumerHandler {
return &PersistentConsumerHandler{
persistentConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql),
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.LatestMsgToRedis.Topic},
config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql),
chatLogDatabase: database,
}
}

@ -124,8 +124,8 @@ func newPushReq(title, content string) PushReq {
Title: title,
Body: content,
ClickType: "startapp",
ChannelID: config.Config.Push.Getui.ChannelID,
ChannelName: config.Config.Push.Getui.ChannelName,
ChannelID: config.Config.Push.GeTui.ChannelID,
ChannelName: config.Config.Push.GeTui.ChannelName,
}}}
return pushReq
}

@ -1,8 +1,13 @@
package getui
import (
"github.com/go-redis/redis"
"sync"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
@ -11,12 +16,6 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils/splitter"
"github.com/redis/go-redis/v9"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"strconv"
"time"
@ -98,12 +97,12 @@ func (g *Client) Push(ctx context.Context, userIDs []string, title, content stri
func (g *Client) Auth(ctx context.Context, timeStamp int64) (token string, expireTime int64, err error) {
h := sha256.New()
h.Write([]byte(config.Config.Push.Getui.AppKey + strconv.Itoa(int(timeStamp)) + config.Config.Push.Getui.MasterSecret))
h.Write([]byte(config.Config.Push.GeTui.AppKey + strconv.Itoa(int(timeStamp)) + config.Config.Push.GeTui.MasterSecret))
sign := hex.EncodeToString(h.Sum(nil))
reqAuth := AuthReq{
Sign: sign,
Timestamp: strconv.Itoa(int(timeStamp)),
AppKey: config.Config.Push.Getui.AppKey,
AppKey: config.Config.Push.GeTui.AppKey,
}
respAuth := AuthResp{}
err = g.request(ctx, authURL, reqAuth, "", &respAuth)
@ -146,7 +145,7 @@ func (g *Client) request(ctx context.Context, url string, input interface{}, tok
header := map[string]string{"token": token}
resp := &Resp{}
resp.Data = output
return g.postReturn(ctx, config.Config.Push.Getui.PushUrl+url, header, input, resp, 3)
return g.postReturn(ctx, config.Config.Push.GeTui.PushUrl+url, header, input, resp, 3)
}
func (g *Client) postReturn(ctx context.Context, url string, header map[string]string, input interface{}, output RespI, timeout int) error {

@ -23,7 +23,7 @@ func NewConsumerHandler(pusher *Pusher) *ConsumerHandler {
var consumerHandler ConsumerHandler
consumerHandler.pusher = pusher
consumerHandler.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ms2pschat.Topic}, config.Config.Kafka.Ms2pschat.Addr,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToPush.Topic}, config.Config.Kafka.Addr,
config.Config.Kafka.ConsumerGroupID.MsgToPush)
return &consumerHandler
}

@ -57,13 +57,12 @@ func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offl
func NewOfflinePusher(cache cache.MsgModel) offlinepush.OfflinePusher {
var offlinePusher offlinepush.OfflinePusher
if config.Config.Push.Getui.Enable {
switch config.Config.Push.Enable {
case "getui":
offlinePusher = getui.NewClient(cache)
}
if config.Config.Push.Fcm.Enable {
case "fcm":
offlinePusher = fcm.NewClient(cache)
}
if config.Config.Push.Jpns.Enable {
case "jpush":
offlinePusher = jpush.NewClient()
}
return offlinePusher
@ -164,8 +163,8 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
return err
}
log.ZInfo(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(pushToUserIDs), "list", pushToUserIDs)
if len(config.Config.Manager.AppManagerUid) > 0 {
ctx = mcontext.WithOpUserIDContext(ctx, config.Config.Manager.AppManagerUid[0])
if len(config.Config.Manager.UserID) > 0 {
ctx = mcontext.WithOpUserIDContext(ctx, config.Config.Manager.UserID[0])
}
defer func(groupID string) {
if err := p.groupRpcClient.DismissGroup(ctx, groupID); err != nil {

@ -48,7 +48,7 @@ func (s *authServer) UserToken(ctx context.Context, req *pbAuth.UserTokenReq) (*
return nil, err
}
resp.Token = token
resp.ExpireTimeSeconds = config.Config.TokenPolicy.AccessExpire
resp.ExpireTimeSeconds = config.Config.TokenPolicy.AccessExpire * 24 * 60 * 60
return &resp, nil
}

@ -38,7 +38,7 @@ type MessageRevoked struct {
func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgReq) error {
switch data.MsgData.SessionType {
case constant.SingleChatType:
if utils.IsContain(data.MsgData.SendID, config.Config.Manager.AppManagerUid) {
if utils.IsContain(data.MsgData.SendID, config.Config.Manager.UserID) {
return nil
}
if data.MsgData.ContentType <= constant.NotificationEnd && data.MsgData.ContentType >= constant.NotificationBegin {
@ -73,7 +73,7 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe
if groupInfo.GroupType == constant.SuperGroup {
return nil
}
if utils.IsContain(data.MsgData.SendID, config.Config.Manager.AppManagerUid) {
if utils.IsContain(data.MsgData.SendID, config.Config.Manager.UserID) {
return nil
}
if data.MsgData.ContentType <= constant.NotificationEnd && data.MsgData.ContentType >= constant.NotificationBegin {

@ -45,10 +45,10 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
return err
}
users := make([]*tablerelation.UserModel, 0)
if len(config.Config.Manager.AppManagerUid) != len(config.Config.Manager.Nickname) {
if len(config.Config.Manager.UserID) != len(config.Config.Manager.Nickname) {
return errs.ErrConfig.Wrap("len(config.Config.Manager.AppManagerUid) != len(config.Config.Manager.Nickname)")
}
for k, v := range config.Config.Manager.AppManagerUid {
for k, v := range config.Config.Manager.UserID {
users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.Manager.Nickname[k]})
}
userDB := relation.NewUserGorm(db)

@ -11,8 +11,8 @@ import (
)
func StartCronTask() error {
log.ZInfo(context.Background(), "start cron task", "cron config", config.Config.Mongo.ChatRecordsClearTime)
fmt.Println("cron task start, config", config.Config.Mongo.ChatRecordsClearTime)
log.ZInfo(context.Background(), "start cron task", "cron config", config.Config.ChatRecordsClearTime)
fmt.Println("cron task start, config", config.Config.ChatRecordsClearTime)
msgTool, err := InitMsgTool()
if err != nil {
return err
@ -20,9 +20,9 @@ func StartCronTask() error {
c := cron.New()
var wg sync.WaitGroup
wg.Add(1)
_, err = c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, msgTool.AllConversationClearMsgAndFixSeq)
_, err = c.AddFunc(config.Config.ChatRecordsClearTime, msgTool.AllConversationClearMsgAndFixSeq)
if err != nil {
fmt.Println("start cron failed", err.Error(), config.Config.Mongo.ChatRecordsClearTime)
fmt.Println("start cron failed", err.Error(), config.Config.ChatRecordsClearTime)
return err
}
c.Start()

@ -74,8 +74,8 @@ func (c *MsgTool) AllConversationClearMsgAndFixSeq() {
func (c *MsgTool) ClearConversationsMsg(ctx context.Context, conversationIDs []string) {
for _, conversationID := range conversationIDs {
if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil {
log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "conversationID", conversationID, "DBRetainChatRecords", config.Config.Mongo.DBRetainChatRecords)
if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, int64(config.Config.RetainChatRecords*24*60*60)); err != nil {
log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "conversationID", conversationID, "DBRetainChatRecords", config.Config.RetainChatRecords)
}
if err := c.checkMaxSeq(ctx, conversationID); err != nil {
log.ZError(ctx, "fixSeq failed", err, "conversationID", conversationID)

@ -2,42 +2,17 @@ package config
import (
_ "embed"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"gopkg.in/yaml.v3"
)
//go:embed version
var Version string
var (
_, b, _, _ = runtime.Caller(0)
// Root folder of this project
Root = filepath.Join(filepath.Dir(b), "../../..")
)
const (
FileName = "config.yaml"
NotificationFileName = "notification.yaml"
ENV = "CONFIG_NAME"
DefaultFolderPath = "../config/"
ConfKey = "conf"
)
var Config config
type CallBackConfig struct {
Enable bool `yaml:"enable"`
CallbackTimeOut int `yaml:"callbackTimeOut"`
CallbackFailedContinue *bool `yaml:"callbackFailedContinue"`
CallbackTimeOut int `yaml:"timeout"`
CallbackFailedContinue *bool `yaml:"failedContinue"`
}
type NotificationConf struct {
@ -55,22 +30,74 @@ type POfflinePush struct {
}
type config struct {
ServerIP string `yaml:"serverip"`
Zookeeper struct {
Schema string `yaml:"schema"`
ZkAddr []string `yaml:"address"`
Username string `yaml:"username"`
Password string `yaml:"password"`
} `yaml:"zookeeper"`
RpcRegisterIP string `yaml:"rpcRegisterIP"`
ListenIP string `yaml:"listenIP"`
Mysql struct {
Address []string `yaml:"address"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Database string `yaml:"database"`
MaxOpenConn int `yaml:"maxOpenConn"`
MaxIdleConn int `yaml:"maxIdleConn"`
MaxLifeTime int `yaml:"maxLifeTime"`
LogLevel int `yaml:"logLevel"`
SlowThreshold int `yaml:"slowThreshold"`
} `yaml:"mysql"`
ServerVersion string `yaml:"serverversion"`
Api struct {
GinPort []int `yaml:"openImApiPort"`
ListenIP string `yaml:"listenIP"`
}
Sdk struct {
WsPort []int `yaml:"openImSdkWsPort"`
DataDir []string `yaml:"dataDir"`
}
Credential struct {
}
Mongo struct {
Uri string `yaml:"uri"`
Address []string `yaml:"address"`
Database string `yaml:"database"`
Username string `yaml:"username"`
Password string `yaml:"password"`
MaxPoolSize int `yaml:"maxPoolSize"`
} `yaml:"mongo"`
Redis struct {
Address []string `yaml:"address"`
Username string `yaml:"username"`
Password string `yaml:"password"`
} `yaml:"redis"`
Kafka struct {
Username string `yaml:"username"`
Password string `yaml:"password"`
Addr []string `yaml:"addr"`
LatestMsgToRedis struct {
Topic string `yaml:"topic"`
} `yaml:"latestMsgToRedis"`
MsgToMongo struct {
Topic string `yaml:"topic"`
} `yaml:"offlineMsgToMongo"`
MsgToPush struct {
Topic string `yaml:"topic"`
} `yaml:"msgToPush"`
MsgToModify struct {
Topic string `yaml:"topic"`
} `yaml:"msgToModify"`
ConsumerGroupID struct {
MsgToRedis string `yaml:"msgToRedis"`
MsgToMongo string `yaml:"msgToMongo"`
MsgToMySql string `yaml:"msgToMySql"`
MsgToPush string `yaml:"msgToPush"`
MsgToModify string `yaml:"msgToModify"`
} `yaml:"consumerGroupID"`
} `yaml:"kafka"`
Rpc struct {
RegisterIP string `yaml:"registerIP"`
ListenIP string `yaml:"listenIP"`
} `yaml:"rpc"`
Api struct {
OpenImApiPort []int `yaml:"openImApiPort"`
ListenIP string `yaml:"listenIP"`
} `yaml:"api"`
Object struct {
Enable string `yaml:"enable"`
@ -90,7 +117,7 @@ type config struct {
Bucket string `yaml:"bucket"`
SecretID string `yaml:"secretID"`
SecretKey string `yaml:"secretKey"`
}
} `yaml:"tencent"`
Ali struct {
RegionID string `yaml:"regionID"`
AccessKeyID string `yaml:"accessKeyID"`
@ -101,7 +128,7 @@ type config struct {
FinalHost string `yaml:"finalHost"`
StsDurationSeconds int64 `yaml:"stsDurationSeconds"`
OssRoleArn string `yaml:"OssRoleArn"`
}
} `yaml:"ali"`
Aws struct {
AccessKeyID string `yaml:"accessKeyID"`
AccessKeySecret string `yaml:"accessKeySecret"`
@ -112,43 +139,8 @@ type config struct {
ExternalId string `yaml:"externalId"`
RoleSessionName string `yaml:"roleSessionName"`
} `yaml:"aws"`
}
} `yaml:"object"`
Mysql struct {
DBAddress []string `yaml:"dbMysqlAddress"`
DBUserName string `yaml:"dbMysqlUserName"`
DBPassword string `yaml:"dbMysqlPassword"`
DBDatabaseName string `yaml:"dbMysqlDatabaseName"`
DBTableName string `yaml:"DBTableName"`
DBMsgTableNum int `yaml:"dbMsgTableNum"`
DBMaxOpenConns int `yaml:"dbMaxOpenConns"`
DBMaxIdleConns int `yaml:"dbMaxIdleConns"`
DBMaxLifeTime int `yaml:"dbMaxLifeTime"`
LogLevel int `yaml:"logLevel"`
SlowThreshold int `yaml:"slowThreshold"`
}
Mongo struct {
DBUri string `yaml:"dbUri"`
DBAddress []string `yaml:"dbAddress"`
DBDirect bool `yaml:"dbDirect"`
DBTimeout int `yaml:"dbTimeout"`
DBDatabase string `yaml:"dbDatabase"`
DBSource string `yaml:"dbSource"`
DBUserName string `yaml:"dbUserName"`
DBPassword string `yaml:"dbPassword"`
DBMaxPoolSize int `yaml:"dbMaxPoolSize"`
DBRetainChatRecords int `yaml:"dbRetainChatRecords"`
ChatRecordsClearTime string `yaml:"chatRecordsClearTime"`
}
Redis struct {
DBAddress []string `yaml:"dbAddress"`
DBMaxIdle int `yaml:"dbMaxIdle"`
DBMaxActive int `yaml:"dbMaxActive"`
DBIdleTimeout int `yaml:"dbIdleTimeout"`
DBUserName string `yaml:"dbUserName"`
DBPassWord string `yaml:"dbPassWord"`
EnableCluster bool `yaml:"enableCluster"`
}
RpcPort struct {
OpenImUserPort []int `yaml:"openImUserPort"`
OpenImFriendPort []int `yaml:"openImFriendPort"`
@ -158,10 +150,10 @@ type config struct {
OpenImAuthPort []int `yaml:"openImAuthPort"`
OpenImPushPort []int `yaml:"openImPushPort"`
OpenImConversationPort []int `yaml:"openImConversationPort"`
OpenImCachePort []int `yaml:"openImCachePort"`
OpenImRtcPort []int `yaml:"openImRtcPort"`
OpenImThirdPort []int `yaml:"openImThirdPort"`
}
} `yaml:"rpcPort"`
RpcRegisterName struct {
OpenImUserName string `yaml:"openImUserName"`
OpenImFriendName string `yaml:"openImFriendName"`
@ -171,128 +163,89 @@ type config struct {
OpenImGroupName string `yaml:"openImGroupName"`
OpenImAuthName string `yaml:"openImAuthName"`
OpenImConversationName string `yaml:"openImConversationName"`
OpenImRtcName string `yaml:"openImRtcName"`
OpenImThirdName string `yaml:"openImThirdName"`
}
Zookeeper struct {
Schema string `yaml:"schema"`
ZkAddr []string `yaml:"zkAddr"`
UserName string `yaml:"userName"`
Password string `yaml:"password"`
} `yaml:"zookeeper"`
} `yaml:"rpcRegisterName"`
Log struct {
StorageLocation string `yaml:"storageLocation"`
RotationTime int `yaml:"rotationTime"`
RemainRotationCount uint `yaml:"remainRotationCount"`
RemainLogLevel int `yaml:"remainLogLevel"`
IsStdout bool `yaml:"isStdout"`
WithStack bool `yaml:"withStack"`
IsJson bool `yaml:"isJson"`
}
ModuleName struct {
LongConnSvrName string `yaml:"longConnSvrName"`
MsgTransferName string `yaml:"msgTransferName"`
PushName string `yaml:"pushName"`
}
WithStack bool `yaml:"withStack"`
} `yaml:"log"`
LongConnSvr struct {
WebsocketPort []int `yaml:"openImWsPort"`
OpenImWsPort []int `yaml:"openImWsPort"`
WebsocketMaxConnNum int `yaml:"websocketMaxConnNum"`
WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"`
WebsocketTimeOut int `yaml:"websocketTimeOut"`
}
WebsocketTimeout int `yaml:"websocketTimeout"`
} `yaml:"longConnSvr"`
Push struct {
Jpns struct {
AppKey string `yaml:"appKey"`
MasterSecret string `yaml:"masterSecret"`
PushUrl string `yaml:"pushUrl"`
PushIntent string `yaml:"pushIntent"`
Enable bool `yaml:"enable"`
}
Getui struct {
Enable string `yaml:"enable"`
GeTui struct {
PushUrl string `yaml:"pushUrl"`
AppKey string `yaml:"appKey"`
Enable bool `yaml:"enable"`
Intent string `yaml:"intent"`
MasterSecret string `yaml:"masterSecret"`
ChannelID string `yaml:"channelID"`
ChannelName string `yaml:"channelName"`
}
} `yaml:"geTui"`
Fcm struct {
ServiceAccount string `yaml:"serviceAccount"`
Enable bool `yaml:"enable"`
}
} `yaml:"fcm"`
Jpns struct {
AppKey string `yaml:"appKey"`
MasterSecret string `yaml:"masterSecret"`
PushUrl string `yaml:"pushUrl"`
PushIntent string `yaml:"pushIntent"`
} `yaml:"jpns"`
}
Manager struct {
AppManagerUid []string `yaml:"appManagerUid"`
Nickname []string `yaml:"nickname"`
}
UserID []string `yaml:"userID"`
Nickname []string `yaml:"nickname"`
} `yaml:"manager"`
Kafka struct {
SASLUserName string `yaml:"SASLUserName"`
SASLPassword string `yaml:"SASLPassword"`
Ws2mschat struct {
Addr []string `yaml:"addr"`
Topic string `yaml:"topic"`
}
MsgToMongo struct {
Addr []string `yaml:"addr"`
Topic string `yaml:"topic"`
}
Ms2pschat struct {
Addr []string `yaml:"addr"`
Topic string `yaml:"topic"`
}
MsgToModify struct {
Addr []string `yaml:"addr"`
Topic string `yaml:"topic"`
}
ConsumerGroupID struct {
MsgToRedis string `yaml:"msgToTransfer"`
MsgToMongo string `yaml:"msgToMongo"`
MsgToMySql string `yaml:"msgToMySql"`
MsgToPush string `yaml:"msgToPush"`
MsgToModify string `yaml:"msgToModify"`
}
}
Secret string `yaml:"secret"`
MultiLoginPolicy int `yaml:"multiloginpolicy"`
ChatPersistenceMysql bool `yaml:"chatpersistencemysql"`
MultiLoginPolicy int `yaml:"multiLoginPolicy"`
ChatPersistenceMysql bool `yaml:"chatPersistenceMysql"`
MsgCacheTimeout int `yaml:"msgCacheTimeout"`
GroupMessageHasReadReceiptEnable bool `yaml:"groupMessageHasReadReceiptEnable"`
SingleMessageHasReadReceiptEnable bool `yaml:"singleMessageHasReadReceiptEnable"`
TokenPolicy struct {
RetainChatRecords int `yaml:"retainChatRecords"`
ChatRecordsClearTime string `yaml:"chatRecordsClearTime"`
TokenPolicy struct {
AccessSecret string `yaml:"accessSecret"`
AccessExpire int64 `yaml:"accessExpire"`
}
} `yaml:"tokenPolicy"`
MessageVerify struct {
FriendVerify *bool `yaml:"friendVerify"`
}
} `yaml:"messageVerify"`
IOSPush struct {
PushSound string `yaml:"pushSound"`
BadgeCount bool `yaml:"badgeCount"`
Production bool `yaml:"production"`
}
} `yaml:"iosPush"`
Callback struct {
CallbackUrl string `yaml:"callbackUrl"`
CallbackBeforeSendSingleMsg CallBackConfig `yaml:"callbackBeforeSendSingleMsg"`
CallbackAfterSendSingleMsg CallBackConfig `yaml:"callbackAfterSendSingleMsg"`
CallbackBeforeSendGroupMsg CallBackConfig `yaml:"callbackBeforeSendGroupMsg"`
CallbackAfterSendGroupMsg CallBackConfig `yaml:"callbackAfterSendGroupMsg"`
CallbackMsgModify CallBackConfig `yaml:"callbackMsgModify"`
CallbackUserOnline CallBackConfig `yaml:"callbackUserOnline"`
CallbackUserOffline CallBackConfig `yaml:"callbackUserOffline"`
CallbackUserKickOff CallBackConfig `yaml:"callbackUserKickOff"`
CallbackOfflinePush CallBackConfig `yaml:"callbackOfflinePush"`
CallbackOnlinePush CallBackConfig `yaml:"callbackOnlinePush"`
CallbackBeforeSuperGroupOnlinePush CallBackConfig `yaml:"callbackSuperGroupOnlinePush"`
CallbackBeforeAddFriend CallBackConfig `yaml:"callbackBeforeAddFriend"`
CallbackBeforeCreateGroup CallBackConfig `yaml:"callbackBeforeCreateGroup"`
CallbackBeforeMemberJoinGroup CallBackConfig `yaml:"callbackBeforeMemberJoinGroup"`
CallbackBeforeSetGroupMemberInfo CallBackConfig `yaml:"callbackBeforeSetGroupMemberInfo"`
CallbackUrl string `yaml:"url"`
CallbackBeforeSendSingleMsg CallBackConfig `yaml:"beforeSendSingleMsg"`
CallbackAfterSendSingleMsg CallBackConfig `yaml:"afterSendSingleMsg"`
CallbackBeforeSendGroupMsg CallBackConfig `yaml:"beforeSendGroupMsg"`
CallbackAfterSendGroupMsg CallBackConfig `yaml:"afterSendGroupMsg"`
CallbackMsgModify CallBackConfig `yaml:"msgModify"`
CallbackUserOnline CallBackConfig `yaml:"userOnline"`
CallbackUserOffline CallBackConfig `yaml:"userOffline"`
CallbackUserKickOff CallBackConfig `yaml:"userKickOff"`
CallbackOfflinePush CallBackConfig `yaml:"offlinePush"`
CallbackOnlinePush CallBackConfig `yaml:"onlinePush"`
CallbackBeforeSuperGroupOnlinePush CallBackConfig `yaml:"superGroupOnlinePush"`
CallbackBeforeAddFriend CallBackConfig `yaml:"beforeAddFriend"`
CallbackBeforeCreateGroup CallBackConfig `yaml:"beforeCreateGroup"`
CallbackBeforeMemberJoinGroup CallBackConfig `yaml:"beforeMemberJoinGroup"`
CallbackBeforeSetGroupMemberInfo CallBackConfig `yaml:"beforeSetGroupMemberInfo"`
} `yaml:"callback"`
Notification Notification `yaml:"notification"`
Prometheus struct {
Enable bool `yaml:"enable"`
@ -308,9 +261,10 @@ type config struct {
MessageTransferPrometheusPort []int `yaml:"messageTransferPrometheusPort"`
ThirdPrometheusPort []int `yaml:"thirdPrometheusPort"`
} `yaml:"prometheus"`
Notification notification `yaml:"notification"`
}
type Notification struct {
type notification struct {
GroupCreated NotificationConf `yaml:"groupCreated"`
GroupInfoSet NotificationConf `yaml:"groupInfoSet"`
JoinGroupApplication NotificationConf `yaml:"joinGroupApplication"`
@ -347,82 +301,3 @@ type Notification struct {
ConversationChanged NotificationConf `yaml:"conversationChanged"`
ConversationSetPrivate NotificationConf `yaml:"conversationSetPrivate"`
}
func GetServiceNames() []string {
return []string{Config.RpcRegisterName.OpenImUserName, Config.RpcRegisterName.OpenImFriendName, Config.RpcRegisterName.OpenImMsgName,
Config.RpcRegisterName.OpenImPushName, Config.RpcRegisterName.OpenImMessageGatewayName, Config.RpcRegisterName.OpenImGroupName,
Config.RpcRegisterName.OpenImAuthName, Config.RpcRegisterName.OpenImConversationName, Config.RpcRegisterName.OpenImRtcName,
Config.RpcRegisterName.OpenImThirdName}
}
func GetOptionsByNotification(cfg NotificationConf) utils.Options {
opts := utils.NewOptions()
if cfg.UnreadCount {
opts = utils.WithOptions(opts, utils.WithUnreadCount(true))
}
if cfg.OfflinePush.Enable {
opts = utils.WithOptions(opts, utils.WithOfflinePush(true))
}
switch cfg.ReliabilityLevel {
case constant.UnreliableNotification:
case constant.ReliableNotificationNoMsg:
opts = utils.WithOptions(opts, utils.WithHistory(true), utils.WithPersistent())
}
opts = utils.WithOptions(opts, utils.WithSendMsg(cfg.IsSendMsg))
return opts
}
func (c *config) unmarshalConfig(config interface{}, configPath string) error {
bytes, err := ioutil.ReadFile(configPath)
if err != nil {
return err
}
if err = yaml.Unmarshal(bytes, config); err != nil {
return err
}
return nil
}
func (c *config) initConfig(config interface{}, configName, configFolderPath string) error {
if configFolderPath == "" {
configFolderPath = DefaultFolderPath
}
configPath := filepath.Join(configFolderPath, configName)
defer func() {
fmt.Println("use config", configPath)
}()
_, err := os.Stat(configPath)
if err != nil {
if !os.IsNotExist(err) {
return err
}
configPath = filepath.Join(Root, "config", configName)
} else {
Root = filepath.Dir(configPath)
}
return c.unmarshalConfig(config, configPath)
}
func (c *config) RegisterConf2Registry(registry discoveryregistry.SvcDiscoveryRegistry) error {
bytes, err := yaml.Marshal(Config)
if err != nil {
return err
}
return registry.RegisterConf2Registry(ConfKey, bytes)
}
func (c *config) GetConfFromRegistry(registry discoveryregistry.SvcDiscoveryRegistry) ([]byte, error) {
return registry.GetConfFromRegistry(ConfKey)
}
func InitConfig(configFolderPath string) error {
err := Config.initConfig(&Config, FileName, configFolderPath)
if err != nil {
return err
}
err = Config.initConfig(&Config.Notification, NotificationFileName, configFolderPath)
if err != nil {
return err
}
return nil
}

@ -0,0 +1,107 @@
package config
import (
"bytes"
"fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"gopkg.in/yaml.v3"
"os"
"path/filepath"
"runtime"
)
var (
_, b, _, _ = runtime.Caller(0)
// Root folder of this project
Root = filepath.Join(filepath.Dir(b), "../../..")
)
const (
FileName = "config.yaml"
NotificationFileName = "notification.yaml"
ENV = "CONFIG_NAME"
DefaultFolderPath = "../config/"
ConfKey = "conf"
)
func GetOptionsByNotification(cfg NotificationConf) utils.Options {
opts := utils.NewOptions()
if cfg.UnreadCount {
opts = utils.WithOptions(opts, utils.WithUnreadCount(true))
}
if cfg.OfflinePush.Enable {
opts = utils.WithOptions(opts, utils.WithOfflinePush(true))
}
switch cfg.ReliabilityLevel {
case constant.UnreliableNotification:
case constant.ReliableNotificationNoMsg:
opts = utils.WithOptions(opts, utils.WithHistory(true), utils.WithPersistent())
}
opts = utils.WithOptions(opts, utils.WithSendMsg(cfg.IsSendMsg))
return opts
}
func (c *config) unmarshalConfig(config interface{}, configPath string) error {
bytes, err := os.ReadFile(configPath)
if err != nil {
return err
}
if err = yaml.Unmarshal(bytes, config); err != nil {
return err
}
return nil
}
func (c *config) initConfig(config interface{}, configName, configFolderPath string) error {
if configFolderPath == "" {
configFolderPath = DefaultFolderPath
}
configPath := filepath.Join(configFolderPath, configName)
defer func() {
fmt.Println("use config", configPath)
}()
_, err := os.Stat(configPath)
if err != nil {
if !os.IsNotExist(err) {
return err
}
configPath = filepath.Join(Root, "config", configName)
} else {
Root = filepath.Dir(configPath)
}
return c.unmarshalConfig(config, configPath)
}
func (c *config) RegisterConf2Registry(registry discoveryregistry.SvcDiscoveryRegistry) error {
bytes, err := yaml.Marshal(Config)
if err != nil {
return err
}
return registry.RegisterConf2Registry(ConfKey, bytes)
}
func (c *config) GetConfFromRegistry(registry discoveryregistry.SvcDiscoveryRegistry) ([]byte, error) {
return registry.GetConfFromRegistry(ConfKey)
}
func InitConfig(configFolderPath string) error {
err := Config.initConfig(&Config, FileName, configFolderPath)
if err != nil {
return err
}
err = Config.initConfig(&Config.Notification, NotificationFileName, configFolderPath)
if err != nil {
return err
}
return nil
}
func EncodeConfig() []byte {
buf := bytes.NewBuffer(nil)
if err := yaml.NewEncoder(buf).Encode(Config); err != nil {
panic(err)
}
return buf.Bytes()
}

@ -2,6 +2,7 @@ package cache
import (
"context"
"errors"
"fmt"
"time"
@ -12,22 +13,25 @@ import (
)
func NewRedis() (redis.UniversalClient, error) {
if len(config.Config.Redis.Address) == 0 {
return nil, errors.New("redis address is empty")
}
specialerror.AddReplace(redis.Nil, errs.ErrRecordNotFound)
var rdb redis.UniversalClient
if config.Config.Redis.EnableCluster {
if len(config.Config.Redis.Address) > 1 {
rdb = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: config.Config.Redis.DBAddress,
Username: config.Config.Redis.DBUserName,
Password: config.Config.Redis.DBPassWord, // no password set
Addrs: config.Config.Redis.Address,
Username: config.Config.Redis.Username,
Password: config.Config.Redis.Password, // no password set
PoolSize: 50,
})
} else {
rdb = redis.NewClient(&redis.Options{
Addr: config.Config.Redis.DBAddress[0],
Username: config.Config.Redis.DBUserName,
Password: config.Config.Redis.DBPassWord, // no password set
DB: 0, // use default DB
PoolSize: 100, // 连接池大小
Addr: config.Config.Redis.Address[0],
Username: config.Config.Redis.Username,
Password: config.Config.Redis.Password, // no password set
DB: 0, // use default DB
PoolSize: 100, // 连接池大小
})
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)

@ -2,6 +2,7 @@ package controller
import (
"fmt"
"github.com/redis/go-redis/v9"
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
@ -20,7 +21,6 @@ import (
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/redis/go-redis/v9"
"go.mongodb.org/mongo-driver/mongo"
)
@ -98,10 +98,10 @@ func NewCommonMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheMo
return &commonMsgDatabase{
msgDocDatabase: msgDocModel,
cache: cacheModel,
producer: kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic),
producerToMongo: kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic),
producerToPush: kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic),
producerToModify: kafka.NewKafkaProducer(config.Config.Kafka.MsgToModify.Addr, config.Config.Kafka.MsgToModify.Topic),
producer: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.LatestMsgToRedis.Topic),
producerToMongo: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToMongo.Topic),
producerToPush: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToPush.Topic),
producerToModify: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToModify.Topic),
}
}

@ -18,15 +18,15 @@ import (
)
func Test_BatchInsertChat2DB(t *testing.T) {
config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"}
config.Config.Mongo.DBTimeout = 60
config.Config.Mongo.DBDatabase = "openIM"
config.Config.Mongo.DBSource = "admin"
config.Config.Mongo.DBUserName = "root"
config.Config.Mongo.DBPassword = "openIM123"
config.Config.Mongo.DBMaxPoolSize = 100
config.Config.Mongo.DBRetainChatRecords = 3650
config.Config.Mongo.ChatRecordsClearTime = "0 2 * * 3"
config.Config.Mongo.Address = []string{"192.168.44.128:37017"}
config.Config.Mongo.Timeout = 60
config.Config.Mongo.Database = "openIM"
config.Config.Mongo.Source = "admin"
config.Config.Mongo.Username = "root"
config.Config.Mongo.Password = "openIM123"
config.Config.Mongo.MaxPoolSize = 100
config.Config.RetainChatRecords = 3650
config.Config.ChatRecordsClearTime = "0 2 * * 3"
mongo, err := unrelation.NewMongo()
if err != nil {
@ -129,15 +129,15 @@ func Test_BatchInsertChat2DB(t *testing.T) {
}
func GetDB() *commonMsgDatabase {
config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"}
config.Config.Mongo.DBTimeout = 60
config.Config.Mongo.DBDatabase = "openIM"
config.Config.Mongo.DBSource = "admin"
config.Config.Mongo.DBUserName = "root"
config.Config.Mongo.DBPassword = "openIM123"
config.Config.Mongo.DBMaxPoolSize = 100
config.Config.Mongo.DBRetainChatRecords = 3650
config.Config.Mongo.ChatRecordsClearTime = "0 2 * * 3"
config.Config.Mongo.Address = []string{"192.168.44.128:37017"}
config.Config.Mongo.Timeout = 60
config.Config.Mongo.Database = "openIM"
config.Config.Mongo.Source = "admin"
config.Config.Mongo.Username = "root"
config.Config.Mongo.Password = "openIM123"
config.Config.Mongo.MaxPoolSize = 100
config.Config.RetainChatRecords = 3650
config.Config.ChatRecordsClearTime = "0 2 * * 3"
mongo, err := unrelation.NewMongo()
if err != nil {

@ -16,7 +16,7 @@ import (
func newMysqlGormDB() (*gorm.DB, error) {
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local",
config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, config.Config.Mysql.DBAddress[0], "mysql")
config.Config.Mysql.Username, config.Config.Mysql.Password, config.Config.Mysql.Address[0], "mysql")
db, err := gorm.Open(mysql.Open(dsn), nil)
if err != nil {
time.Sleep(time.Duration(30) * time.Second)
@ -30,13 +30,13 @@ func newMysqlGormDB() (*gorm.DB, error) {
return nil, err
}
defer sqlDB.Close()
sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s default charset utf8mb4 COLLATE utf8mb4_unicode_ci;", config.Config.Mysql.DBDatabaseName)
sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s default charset utf8mb4 COLLATE utf8mb4_unicode_ci;", config.Config.Mysql.Database)
err = db.Exec(sql).Error
if err != nil {
return nil, fmt.Errorf("init db %w", err)
}
dsn = fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local",
config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, config.Config.Mysql.DBAddress[0], config.Config.Mysql.DBDatabaseName)
config.Config.Mysql.Username, config.Config.Mysql.Password, config.Config.Mysql.Address[0], config.Config.Mysql.Database)
sqlLogger := log.NewSqlLogger(logger.LogLevel(config.Config.Mysql.LogLevel), true, time.Duration(config.Config.Mysql.SlowThreshold)*time.Millisecond)
db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: sqlLogger,
@ -48,9 +48,9 @@ func newMysqlGormDB() (*gorm.DB, error) {
if err != nil {
return nil, err
}
sqlDB.SetConnMaxLifetime(time.Second * time.Duration(config.Config.Mysql.DBMaxLifeTime))
sqlDB.SetMaxOpenConns(config.Config.Mysql.DBMaxOpenConns)
sqlDB.SetMaxIdleConns(config.Config.Mysql.DBMaxIdleConns)
sqlDB.SetConnMaxLifetime(time.Second * time.Duration(config.Config.Mysql.MaxLifeTime))
sqlDB.SetMaxOpenConns(config.Config.Mysql.MaxOpenConn)
sqlDB.SetMaxIdleConns(config.Config.Mysql.MaxIdleConn)
return db, nil
}

@ -23,27 +23,27 @@ type Mongo struct {
func NewMongo() (*Mongo, error) {
specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound)
uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority"
if config.Config.Mongo.DBUri != "" {
if config.Config.Mongo.Uri != "" {
// example: mongodb://$user:$password@mongo1.mongo:27017,mongo2.mongo:27017,mongo3.mongo:27017/$DBDatabase/?replicaSet=rs0&readPreference=secondary&authSource=admin&maxPoolSize=$DBMaxPoolSize
uri = config.Config.Mongo.DBUri
uri = config.Config.Mongo.Uri
} else {
//mongodb://mongodb1.example.com:27317,mongodb2.example.com:27017/?replicaSet=mySet&authSource=authDB
mongodbHosts := ""
for i, v := range config.Config.Mongo.DBAddress {
if i == len(config.Config.Mongo.DBAddress)-1 {
for i, v := range config.Config.Mongo.Address {
if i == len(config.Config.Mongo.Address)-1 {
mongodbHosts += v
} else {
mongodbHosts += v + ","
}
}
if config.Config.Mongo.DBPassword != "" && config.Config.Mongo.DBUserName != "" {
if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" {
uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
config.Config.Mongo.DBUserName, config.Config.Mongo.DBPassword, mongodbHosts,
config.Config.Mongo.DBDatabase, config.Config.Mongo.DBMaxPoolSize)
config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts,
config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize)
} else {
uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin",
mongodbHosts, config.Config.Mongo.DBDatabase,
config.Config.Mongo.DBMaxPoolSize)
mongodbHosts, config.Config.Mongo.Database,
config.Config.Mongo.MaxPoolSize)
}
}
fmt.Println("mongo:", uri)
@ -61,7 +61,7 @@ func (m *Mongo) GetClient() *mongo.Client {
}
func (m *Mongo) GetDatabase() *mongo.Database {
return m.db.Database(config.Config.Mongo.DBDatabase)
return m.db.Database(config.Config.Mongo.Database)
}
func (m *Mongo) CreateMsgIndex() error {
@ -83,7 +83,7 @@ func (m *Mongo) CreateExtendMsgSetIndex() error {
}
func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...string) error {
db := m.db.Database(config.Config.Mongo.DBDatabase).Collection(collection)
db := m.db.Database(config.Config.Mongo.Database).Collection(collection)
opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
indexView := db.Indexes()
keysDoc := bsonx.Doc{}

@ -21,10 +21,10 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer {
p.Topic = topic
p.addr = addr
consumerConfig := sarama.NewConfig()
if config.Config.Kafka.SASLUserName != "" && config.Config.Kafka.SASLPassword != "" {
if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" {
consumerConfig.Net.SASL.Enable = true
consumerConfig.Net.SASL.User = config.Config.Kafka.SASLUserName
consumerConfig.Net.SASL.Password = config.Config.Kafka.SASLPassword
consumerConfig.Net.SASL.User = config.Config.Kafka.Username
consumerConfig.Net.SASL.Password = config.Config.Kafka.Password
}
consumer, err := sarama.NewConsumer(p.addr, consumerConfig)
if err != nil {

@ -32,10 +32,10 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
p.config.Producer.Return.Errors = true
p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all
p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly
if config.Config.Kafka.SASLUserName != "" && config.Config.Kafka.SASLPassword != "" {
if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" {
p.config.Net.SASL.Enable = true
p.config.Net.SASL.User = config.Config.Kafka.SASLUserName
p.config.Net.SASL.Password = config.Config.Kafka.SASLPassword
p.config.Net.SASL.User = config.Config.Kafka.Username
p.config.Net.SASL.Password = config.Config.Kafka.Password
}
p.addr = addr
p.topic = topic

@ -26,7 +26,7 @@ func init() {
func initAesKey() {
once.Do(func() {
key := md5.Sum([]byte("openim:" + config.Config.Secret))
key := md5.Sum([]byte("openim:" + config.Config.TokenPolicy.AccessSecret))
var err error
block, err = aes.NewCipher(key[:])
if err != nil {

@ -7,7 +7,7 @@ import (
)
func TestCheck(t *testing.T) {
config.Config.Secret = "123456"
config.Config.TokenPolicy.Secret = "123456"
args := []string{"1", "2", "3"}

@ -62,7 +62,7 @@ func GetClaimFromToken(tokensString string) (*Claims, error) {
func CheckAccessV3(ctx context.Context, ownerUserID string) (err error) {
opUserID := mcontext.GetOpUserID(ctx)
if utils.IsContain(opUserID, config.Config.Manager.AppManagerUid) {
if utils.IsContain(opUserID, config.Config.Manager.UserID) {
return nil
}
if opUserID == ownerUserID {
@ -72,11 +72,11 @@ func CheckAccessV3(ctx context.Context, ownerUserID string) (err error) {
}
func IsAppManagerUid(ctx context.Context) bool {
return utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.AppManagerUid)
return utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.UserID)
}
func CheckAdmin(ctx context.Context) error {
if utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.AppManagerUid) {
if utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.UserID) {
return nil
}
return errs.ErrIdentity.Wrap(fmt.Sprintf("user %s is not admin userID", mcontext.GetOpUserID(ctx)))
@ -86,7 +86,7 @@ func ParseRedisInterfaceToken(redisToken interface{}) (*Claims, error) {
return GetClaimFromToken(string(redisToken.([]uint8)))
}
func IsManagerUserID(opUserID string) bool {
return utils.IsContain(opUserID, config.Config.Manager.AppManagerUid)
return utils.IsContain(opUserID, config.Config.Manager.UserID)
}
func WsVerifyToken(token, userID string, platformID int) error {
claim, err := GetClaimFromToken(token)

@ -21,23 +21,20 @@ import (
func Start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error {
fmt.Println("start", rpcRegisterName, "server, port: ", rpcPort, "prometheusPort:", prometheusPort, ", OpenIM version: ", config.Version)
listener, err := net.Listen("tcp", net.JoinHostPort(config.Config.ListenIP, strconv.Itoa(rpcPort)))
listener, err := net.Listen("tcp", net.JoinHostPort(network.GetListenIP(config.Config.Rpc.ListenIP), strconv.Itoa(rpcPort)))
if err != nil {
return err
}
defer listener.Close()
client, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
openKeeper.WithFreq(time.Hour), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.UserName,
zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
openKeeper.WithFreq(time.Hour), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username,
config.Config.Zookeeper.Password), openKeeper.WithRoundRobin(), openKeeper.WithTimeout(10), openKeeper.WithLogger(log.NewZkLogger()))
if err != nil {
return utils.Wrap1(err)
}
if client.CreateRpcRootNodes(config.GetServiceNames()); err != nil {
return err
}
defer client.CloseZK()
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
registerIP, err := network.GetRpcRegisterIP(config.Config.RpcRegisterIP)
defer zkClient.CloseZK()
zkClient.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
registerIP, err := network.GetRpcRegisterIP(config.Config.Rpc.RegisterIP)
if err != nil {
return err
}
@ -56,11 +53,11 @@ func Start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(c
}
srv := grpc.NewServer(options...)
defer srv.GracefulStop()
err = rpcFn(client, srv)
err = rpcFn(zkClient, srv)
if err != nil {
return utils.Wrap1(err)
}
err = client.Register(rpcRegisterName, registerIP, rpcPort, grpc.WithTransportCredentials(insecure.NewCredentials()))
err = zkClient.Register(rpcRegisterName, registerIP, rpcPort, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return utils.Wrap1(err)
}

Loading…
Cancel
Save