diff --git a/cmd/api/main.go b/cmd/api/main.go index 05806f62c..6f56a7704 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -1,7 +1,6 @@ package main import ( - "bytes" "context" "fmt" "net" @@ -10,8 +9,6 @@ import ( "strconv" "time" - "gopkg.in/yaml.v3" - "net/http" _ "net/http/pprof" @@ -55,7 +52,7 @@ func run(port int) error { fmt.Println("api start init discov client") var client discoveryregistry.SvcDiscoveryRegistry client, err = openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, - openKeeper.WithFreq(time.Hour), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.UserName, + openKeeper.WithFreq(time.Hour), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username, config.Config.Zookeeper.Password), openKeeper.WithRoundRobin(), openKeeper.WithTimeout(10), openKeeper.WithLogger(log.NewZkLogger())) if err != nil { return err @@ -64,12 +61,8 @@ func run(port int) error { return err } fmt.Println("api init discov client success") - buf := bytes.NewBuffer(nil) - if err := yaml.NewEncoder(buf).Encode(config.Config); err != nil { - return err - } fmt.Println("api register public config to discov") - if err := client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, buf.Bytes()); err != nil { + if err := client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.EncodeConfig()); err != nil { return err } fmt.Println("api register public config to discov success") diff --git a/config/config.yaml b/config/config.yaml index ce58921a8..c3c9282c8 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,117 +1,84 @@ -# The class cannot be named by Pascal or camel case. -# If it is not used, the corresponding structure will not be set, -# and it will not be read naturally. -serverversion: 2.3.7 +#OpenIM config + + #---------------Infrastructure configuration---------------------# zookeeper: - schema: openim #默认即可 - zkAddr: [ 127.0.0.1:2181 ] #单机部署时,默认即可 - userName: - password: + schema: openim #不建议修改 + address: [ 127.0.0.1:2181 ] # + username: #用户名 + password: #密码 mysql: - dbMysqlAddress: [ 127.0.0.1:13306 ] #mysql地址 目前仅支持单机,默认即可 - dbMysqlUserName: root #mysql用户名,建议修改 - dbMysqlPassword: openIM123 # mysql密码,建议修改 - dbMysqlDatabaseName: openIM_v2 #默认即可 - dbTableName: eMsg #默认即可 - dbMsgTableNum: 1 - dbMaxOpenConns: 100 - dbMaxIdleConns: 10 - dbMaxLifeTime: 5 - logLevel: 4 #1=slient 2=error 3=warn 4=info - slowThreshold: 500 + address: [ 127.0.0.1:13306 ] #目前仅支持单机 + username: root #用户名 + password: openIM123 #密码 + database: openIM_v2 #不建议修改 + maxOpenConn: 1000 #最大连接数 + maxIdleConn: 100 #最大空闲连接数 + maxLifeTime: 60 #连接可以重复使用的最长时间(秒) + logLevel: 4 #日志级别 1=slient 2=error 3=warn 4=info + slowThreshold: 500 #慢语句阈值 (毫秒) mongo: - dbUri: ""#当dbUri值不为空则直接使用该值 - dbAddress: [ 127.0.0.1:37017 ] #单机时为mongo地址,使用分片集群时,为mongos地址 默认即可 - dbDirect: false - dbTimeout: 60 - dbDatabase: openIM #mongo db 默认即可 - dbSource: admin - dbUserName: root #mongo用户名,建议先不设置 - dbPassword: openIM123 #mongo密码,建议先不设置 - dbMaxPoolSize: 100 - dbRetainChatRecords: 3650 #mongo保存离线消息时间(天),根据需求修改 - chatRecordsClearTime: "0 2 * * 3" # 每周三凌晨2点清除消息,该配置和linux定时任务一样, 清理操作建议设置在用户活跃少的时候 # 0 3 * * * + uri: #不为空则直接使用该值 + address: [ 127.0.0.1:37017 ] #单机时为mongo地址,使用分片集群时,为mongos地址 + database: openIM #mongo db 默认即可 + username: root #用户名 + password: openIM123 #密码 + maxPoolSize: 100 redis: - dbAddress: [ 127.0.0.1:16379 ] #redis地址 单机时,填写一个地址即可,使用redis集群时候,填写集群中多个节点地址(主从地址都可以填写,增加容灾能力),默认即可 - dbMaxIdle: 128 - dbMaxActive: 0 - dbIdleTimeout: 120 - dbUserName: #only redis version 6.0+ need username - dbPassWord: openIM123 #redis密码 建议修改 - enableCluster: false #如果外部redis以集群方式启动,需要打开此开关 + address: [ 127.0.0.1:16379 ] # + username: #only redis version 6.0+ need username + password: openIM123 #密码 kafka: - SASLUserName: - SASLPassword: - ws2mschat: - addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 - topic: "ws2ms_chat" #用于mongo和mysql保存消息 - msgtomongo: - addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 - topic: "msg_to_mongo" - ms2pschat: - addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 - topic: "ms2ps_chat" #消息push - msgtomodify: - addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 - topic: "msg_to_modify" - consumergroupid: - msgToTransfer: mongo - msgToMongo: mongo_ex - msgToMySql: mysql - msgToPush: push - msgToModify: modify - - -#---------------Internal service configuration---------------------# - -# The service ip default is empty, -# automatically obtain the machine's valid network card ip as the service ip, -# otherwise the configuration ip is preferred -#如果是单机模式,用0.0.0.0或者不填,默认即可 - -#作为rpc时,注册到etcd的地址,单机默认即可,如果是集群部署,需要修改(具体使用内网地址还是外网地址,要依情况而定,目的是api/gateway能访问到) -rpcRegisterIP: -#默认即可 -listenIP: 0.0.0.0 + username: #用户名 + password: #密码 + addr: [ 127.0.0.1:9092 ] # + latestMsgToRedis: + topic: "latestMsgToRedis" #不建议修改 + offlineMsgToMongo: + topic: "offlineMsgToMongoMysql" #不建议修改 + msgToPush: + topic: "msqToPush" #不建议修改 + msgToModify: + topic: "msgToModify" #不建议修改 + consumerGroupID: #消费者组,不建议修改 + msgToRedis: redis # + msgToMongo: mongo # + msgToMySql: mysql # + msgToPush: push # + msgToModify: modify # -api: - openImApiPort: [ 10002 ] #api服务端口,默认即可,需要开放此端口或做nginx转发 - listenIP: 0.0.0.0 +rpc: + registerIP: #作为rpc启动时,注册到zookeeper的IP,api/gateway能访问到此ip和对应的rpcPort中的端口 + listenIP: #默认为0.0.0.0 -sdk: - openImSdkWsPort: [ 10003 ] #jssdk服务端口,默认即可,项目中使用jssdk才需开放此端口或做nginx转发 - dataDir: [ ../db/sdk/ ] - openImWsAddress: ws://127.0.0.1:10001 - openImApiAddress: http://127.0.0.1:10002 - -#对象存储服务,以下配置二选一,目前支持两种,腾讯云和minio,二者配置好其中一种即可(如果使用minio参考https://doc.rentsoft.cn/#/qa/minio搭建minio服务器) -credential: #腾讯cos,发送图片、视频、文件时需要,请自行申请后替换,必须修改 +api: + openImApiPort: [ 10002 ] #api服务端口 + listenIP: #默认为0.0.0.0 object: - enable: minio + enable: minio #使用minio apiURL: http://127.0.0.1:10002/third/object - minio: #MinIO 发送图片、视频、文件时需要,请自行申请后替换,必须修改。 客户端初始化InitSDK,中 object_storage参数为minio - tempBucket: "openim" - dataBucket: "openim" - location: us-east-1 - endpoint: http://127.0.0.1:10005 #minio外网ip 这个ip是给客户端访问的 - accessKeyID: root - secretAccessKey: openIM123 - isDistributedMod: false # 是否分布式多硬盘部署 默认docker-compose中为false,如果是多硬盘部署,需要修改为true - tencent: + minio: + tempBucket: "openim" #不建议修改 + dataBucket: "openim" #不建议修改 + location: us-east-1 #不建议修改 + endpoint: http://127.0.0.1:10005 #minio对外服务的ip和端口,app要能访问此ip和端口 + accessKeyID: root #ID + secretAccessKey: openIM123 #秘钥 + isDistributedMod: false #是否分布式多硬盘部署,如果是多硬盘部署,需要修改为true + tencent: #tencent cos appID: region: bucket: secretID: secretKey: - ali: # ali oss + ali: #ali oss regionID: accessKeyID: accessKeySecret: @@ -122,16 +89,16 @@ object: stsDurationSeconds: OssRoleArn: aws: - accessKeyID: #AssumeRole用户关联的accessKeyID - accessKeySecret: #AssumeRole用户关联的accessKeySecrect - region: #分区 - bucket: #桶 - finalHost: #对外Host - roleArn: #RoleArn - externalId: #角色扩展Id - roleSessionName: #角色SESSION名称 - -rpcport: #rpc服务端口 默认即可 + accessKeyID: + accessKeySecret: + region: + bucket: + finalHost: + roleArn: + externalId: + roleSessionName: + +rpcPort: #rpc服务端口,不建议修改,端口由脚本读取后传入程序,如启动多个程序,只需要填入多个端口,用逗号隔开,如 openImUserPort: [10110, 10111] openImUserPort: [ 10110 ] openImFriendPort: [ 10120 ] openImMessagePort: [ 10130 ] @@ -140,10 +107,9 @@ rpcport: #rpc服务端口 默认即可 openImAuthPort: [ 10160 ] openImPushPort: [ 10170 ] openImConversationPort: [ 10180 ] - openImRtcPort: [ 10190 ] - openImThirdPort: [ 10200 ] + openImThirdPort: [ 10190 ] -rpcregistername: #rpc注册服务名,默认即可 +rpcRegisterName: #rpc注册服务名,不建议修改 openImUserName: User openImFriendName: Friend openImMsgName: Msg @@ -152,158 +118,138 @@ rpcregistername: #rpc注册服务名,默认即可 openImGroupName: Group openImAuthName: Auth openImConversationName: Conversation - openImRtcName: Rtc openImThirdName: Third log: - storageLocation: ../logs/ - rotationTime: 24 - remainRotationCount: 2 #日志数量 - #日志级别 6表示全都打印,测试阶段建议设置为6 - remainLogLevel: 6 + storageLocation: ../logs/ #存放目录 + rotationTime: 24 #日志旋转时间 + remainRotationCount: 2 #日志数量 + remainLogLevel: 6 #日志级别 6表示全都打印, isStdout: false isJson: false withStack: false -modulename: #日志文件按模块命名,默认即可 - longConnSvrName: msg_gateway - msgTransferName: msg_transfer - pushName: push +longConnSvr: + openImWsPort: [ 10001 ] #msg_gateway的websocket端口 + websocketMaxConnNum: 100000 #websocket最大连接数 + websocketMaxMsgLen: 4096 #websocket请求包最大长度 + websocketTimeout: 10 #websocket连接握手超时时间 -longconnsvr: - openImWsPort: [ 10001 ] # ws服务端口,默认即可,要开放此端口或做nginx转发 - websocketMaxConnNum: 10000 - websocketMaxMsgLen: 4096 - websocketTimeOut: 10 - -## 推送只能开启一个 enable代表开启 push: - tpns: #腾讯推送,暂未测试 暂不要使用 - ios: - accessID: 1600018281 - secretKey: 3cd68a77a95b89e5089a1aca523f318f - android: - accessID: 111 - secretKey: 111 - enable: false # true or false (bool) - jpns: #极光推送 在极光后台申请后,修改以下四项,必须修改 - appKey: - masterSecret: - pushUrl: - pushIntent: - enable: false # true or false (bool) - getui: #个推推送 + enable: getui + geTui: #个推离线推送 pushUrl: "https://restapi.getui.com/v2/$appId" masterSecret: "" appKey: "" intent: "" - enable: false # true or false (bool) channelID: "" channelName: "" - fcm: #firebase cloud message 消息推送 - serviceAccount: "openim-5c6c0-firebase-adminsdk-ppwol-8765884a78.json" #帐号文件,此处需要改修配置,并且这个文件放在 config目录下 - enable: false - + fcm: #fcm离线推送 + serviceAccount: "x.json" #帐号文件,并放在 config目录下 + jpns: #极光推送 在极光后台申请后,修改以下四项 + appKey: + masterSecret: + pushUrl: + pushIntent: + manager: - #app管理员userID和对应的secret 建议修改。 用于管理后台登录,也可以用户管理后台对应的api - appManagerUid: [ "openIM123456","openIM654321", "openIM333", "openIMAdmin" ] - nickname: [ "系统通知","openIM654321", "openIM333", "openIMAdmin" ] - - -secret: tuoyun -# 多端互踢策略 -# 1:多平台登录:Android、iOS、Windows、Mac 每种平台只能一个在线,web端可以多个同时在线 -multiloginpolicy: 1 - -#msg log insert to db -chatpersistencemysql: true -#消息缓存时间 -msgCacheTimeout: 86400 -#群聊已读开启 -groupMessageHasReadReceiptEnable: true -#单聊已读开启 -singleMessageHasReadReceiptEnable: true - -#token config -tokenpolicy: - accessSecret: "OpenIM_server" #token生成相关,默认即可 - # Token effective time day as a unit - accessExpire: 90 #token过期时间(天) 默认即可 -messageverify: - friendVerify: false + userID: [ "openIM123456","openIM654321","openIMAdmin" ] #内置的app管理员userID + nickname: [ "system1","system2", "system3" ] #内置的app管理员nickname + + + + +multiLoginPolicy: 1 #多平台登录:Android、iOS、Windows、Mac、web 每种平台只能有一个在线 + + +chatPersistenceMysql: true #消息是否存入mysql,mysql中的消息仅用于管理后台使用 +msgCacheTimeout: 86400 #信消息缓存时间秒,不建议修改 +groupMessageHasReadReceiptEnable: true #群聊已读是否开启 +singleMessageHasReadReceiptEnable: true #单聊已读是否开启 + +retainChatRecords: 365 #mongo保存离线消息时间(天) +chatRecordsClearTime: "0 2 * * 3" #每周三凌晨2点清理mongo中的过期(超过retainChatRecords时间)消息 + + + +tokenPolicy: + accessSecret: openIM123 #秘钥,获取token时校验 + accessExpire: 90 #过期时间(天) + +messageVerify: + friendVerify: false #发送消息时是否验证好友关系 #ios系统推送声音以及标记计数 -iospush: +iosPush: pushSound: "xxx" badgeCount: true production: false callback: - # callback url 需要自行更换callback url - callbackUrl: "http://127.0.0.1:10008/callback/open_im" - # 开启关闭操作前后回调的配置 - callbackBeforeSendSingleMsg: - enable: false # 回调是否启用 - callbackTimeOut: 2 # 回调超时时间 - callbackFailedContinue: true # 回调超时是否继续执行代码 - callbackAfterSendSingleMsg: + # 回调callback + url: + beforeSendSingleMsg: + enable: false #是否启用此回调事件 + timeout: 5 #超时时间(秒) + failedContinue: true #如回调失败是否继续往后执行 + afterSendSingleMsg: enable: false - callbackTimeOut: 2 - callbackBeforeSendGroupMsg: + timeout: 5 + beforeSendGroupMsg: enable: false - callbackTimeOut: 2 - callbackFailedContinue: true - callbackAfterSendGroupMsg: + timeout: 5 + failedContinue: true + afterSendGroupMsg: enable: false - callbackTimeOut: 2 - callbackMsgModify: + timeout: 5 + msgModify: enable: false - callbackTimeOut: 2 - callbackFailedContinue: true - callbackUserOnline: + timeout: 5 + failedContinue: true + userOnline: enable: false - callbackTimeOut: 2 - callbackUserOffline: + timeout: 5 + userOffline: enable: false - callbackTimeOut: 2 - callbackUserKickOff: + timeout: 5 + userKickOff: enable: false - callbackTimeOut: 2 - callbackOfflinePush: + timeout: 5 + offlinePush: enable: false - callbackTimeOut: 2 - callbackFailedContinue: true # 回调超时是否继续离线推送 - callbackOnlinePush: + timeout: 5 + failedContinue: true + onlinePush: enable: false - callbackTimeOut: 2 - callbackFailedContinue: true # 回调超时是否继续离线推送 - callbackSuperGroupOnlinePush: + timeout: 5 + failedContinue: true + superGroupOnlinePush: enable: false - callbackTimeOut: 2 - callbackFailedContinue: true # 回调超时是否继续离线推送 - callbackBeforeAddFriend: + timeout: 5 + failedContinue: true + beforeAddFriend: enable: false - callbackTimeOut: 2 - callbackFailedContinue: true # 回调超时是否继续 - callbackBeforeCreateGroup: + timeout: 5 + failedContinue: true + beforeCreateGroup: enable: false - callbackTimeOut: 2 - callbackFailedContinue: true # 回调超时是否继续 - callbackBeforeMemberJoinGroup: + timeout: 5 + failedContinue: true + beforeMemberJoinGroup: enable: false - callbackTimeOut: 2 - callbackFailedContinue: true # 回调超时是否继续 - callbackBeforeSetGroupMemberInfo: + timeout: 5 + failedContinue: true + beforeSetGroupMemberInfo: enable: false - callbackTimeOut: 2 - callbackFailedContinue: true # 回调超时是否继续 - callbackSetMessageReactionExtensions: + timeout: 5 + failedContinue: true + setMessageReactionExtensions: enable: false - callbackTimeOut: 2 - callbackFailedContinue: true # 回调超时是否继续 + timeout: 5 + failedContinue: true + -# prometheus每个服务监听的端口数量需要和rpc port保持一致 -prometheus: +prometheus: #prometheus每个服务的端口数量需要和rpcPort保持对应 enable: false userPrometheusPort: [ 20110 ] friendPrometheusPort: [ 20120 ] @@ -314,5 +260,6 @@ prometheus: pushPrometheusPort: [ 20170 ] conversationPrometheusPort: [ 20230 ] rtcPrometheusPort: [ 21300 ] - messageTransferPrometheusPort: [ 21400, 21401, 21402, 21403 ] # 端口数量和 script/path_info.cfg msg_transfer_service_num保持一致 + thirdPrometheusPort: [ 21301 ] + messageTransferPrometheusPort: [ 21400, 21401, 21402, 21403 ] #端口数量需要和script/path_info.cfg中的msg_transfer_service_num保持一致 diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 87f3c5464..2e001667a 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -12,7 +12,7 @@ func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error { longServer, err := NewWsServer( WithPort(wsPort), WithMaxConnNum(int64(config.Config.LongConnSvr.WebsocketMaxConnNum)), - WithHandshakeTimeout(time.Duration(config.Config.LongConnSvr.WebsocketTimeOut)*time.Second), + WithHandshakeTimeout(time.Duration(config.Config.LongConnSvr.WebsocketTimeout)*time.Second), WithMessageMaxMsgLength(config.Config.LongConnSvr.WebsocketMaxMsgLen)) if err != nil { return err diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 8c45a9910..589e81726 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -48,7 +48,7 @@ func StartTransfer(prometheusPort int) error { return err } client, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, - openKeeper.WithFreq(time.Hour), openKeeper.WithRoundRobin(), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.UserName, + openKeeper.WithFreq(time.Hour), openKeeper.WithRoundRobin(), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username, config.Config.Zookeeper.Password), openKeeper.WithTimeout(10), openKeeper.WithLogger(log.NewZkLogger())) if err != nil { return err diff --git a/internal/msgtransfer/modify_msg_handler.go b/internal/msgtransfer/modify_msg_handler.go index 605d119ae..0b9ad2aff 100644 --- a/internal/msgtransfer/modify_msg_handler.go +++ b/internal/msgtransfer/modify_msg_handler.go @@ -30,7 +30,7 @@ func NewModifyMsgConsumerHandler(database controller.ExtendMsgDatabase) *ModifyM return &ModifyMsgConsumerHandler{ modifyMsgConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToModify.Topic}, - config.Config.Kafka.MsgToModify.Addr, config.Config.Kafka.ConsumerGroupID.MsgToModify), + config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToModify), extendMsgDatabase: database, } } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 94facfef3..7f776d5ee 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -75,8 +75,8 @@ func NewOnlineHistoryRedisConsumerHandler(database controller.CommonMsgDatabase, och.conversationRpcClient = conversationRpcClient och.groupRpcClient = groupRpcClient och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, - OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, - config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis) + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.LatestMsgToRedis.Topic}, + config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis) //statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) return &och } diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 5d052fac4..77c5e9a6b 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -21,7 +21,7 @@ func NewOnlineHistoryMongoConsumerHandler(database controller.CommonMsgDatabase) mc := &OnlineHistoryMongoConsumerHandler{ historyConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToMongo.Topic}, - config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo), + config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo), msgDatabase: database, } return mc diff --git a/internal/msgtransfer/persistent_msg_handler.go b/internal/msgtransfer/persistent_msg_handler.go index 6d9b64a13..026f5f5a0 100644 --- a/internal/msgtransfer/persistent_msg_handler.go +++ b/internal/msgtransfer/persistent_msg_handler.go @@ -29,8 +29,8 @@ type PersistentConsumerHandler struct { func NewPersistentConsumerHandler(database controller.ChatLogDatabase) *PersistentConsumerHandler { return &PersistentConsumerHandler{ persistentConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, - OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, - config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql), + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.LatestMsgToRedis.Topic}, + config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql), chatLogDatabase: database, } } diff --git a/internal/push/offlinepush/getui/body.go b/internal/push/offlinepush/getui/body.go index 6cee9a545..b7a4bc082 100644 --- a/internal/push/offlinepush/getui/body.go +++ b/internal/push/offlinepush/getui/body.go @@ -124,8 +124,8 @@ func newPushReq(title, content string) PushReq { Title: title, Body: content, ClickType: "startapp", - ChannelID: config.Config.Push.Getui.ChannelID, - ChannelName: config.Config.Push.Getui.ChannelName, + ChannelID: config.Config.Push.GeTui.ChannelID, + ChannelName: config.Config.Push.GeTui.ChannelName, }}} return pushReq } diff --git a/internal/push/offlinepush/getui/push.go b/internal/push/offlinepush/getui/push.go index 6eb0b0fdf..db5accbd7 100644 --- a/internal/push/offlinepush/getui/push.go +++ b/internal/push/offlinepush/getui/push.go @@ -1,8 +1,13 @@ package getui import ( + "github.com/go-redis/redis" "sync" + "context" + "crypto/sha256" + "encoding/hex" + "errors" "github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" @@ -11,12 +16,6 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils/splitter" - "github.com/redis/go-redis/v9" - - "context" - "crypto/sha256" - "encoding/hex" - "errors" "strconv" "time" @@ -98,12 +97,12 @@ func (g *Client) Push(ctx context.Context, userIDs []string, title, content stri func (g *Client) Auth(ctx context.Context, timeStamp int64) (token string, expireTime int64, err error) { h := sha256.New() - h.Write([]byte(config.Config.Push.Getui.AppKey + strconv.Itoa(int(timeStamp)) + config.Config.Push.Getui.MasterSecret)) + h.Write([]byte(config.Config.Push.GeTui.AppKey + strconv.Itoa(int(timeStamp)) + config.Config.Push.GeTui.MasterSecret)) sign := hex.EncodeToString(h.Sum(nil)) reqAuth := AuthReq{ Sign: sign, Timestamp: strconv.Itoa(int(timeStamp)), - AppKey: config.Config.Push.Getui.AppKey, + AppKey: config.Config.Push.GeTui.AppKey, } respAuth := AuthResp{} err = g.request(ctx, authURL, reqAuth, "", &respAuth) @@ -146,7 +145,7 @@ func (g *Client) request(ctx context.Context, url string, input interface{}, tok header := map[string]string{"token": token} resp := &Resp{} resp.Data = output - return g.postReturn(ctx, config.Config.Push.Getui.PushUrl+url, header, input, resp, 3) + return g.postReturn(ctx, config.Config.Push.GeTui.PushUrl+url, header, input, resp, 3) } func (g *Client) postReturn(ctx context.Context, url string, header map[string]string, input interface{}, output RespI, timeout int) error { diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 29dc9a4b6..ac128a1c7 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -23,7 +23,7 @@ func NewConsumerHandler(pusher *Pusher) *ConsumerHandler { var consumerHandler ConsumerHandler consumerHandler.pusher = pusher consumerHandler.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, - OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ms2pschat.Topic}, config.Config.Kafka.Ms2pschat.Addr, + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToPush.Topic}, config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToPush) return &consumerHandler } diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 7eead73ee..df38f06cf 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -57,13 +57,12 @@ func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offl func NewOfflinePusher(cache cache.MsgModel) offlinepush.OfflinePusher { var offlinePusher offlinepush.OfflinePusher - if config.Config.Push.Getui.Enable { + switch config.Config.Push.Enable { + case "getui": offlinePusher = getui.NewClient(cache) - } - if config.Config.Push.Fcm.Enable { + case "fcm": offlinePusher = fcm.NewClient(cache) - } - if config.Config.Push.Jpns.Enable { + case "jpush": offlinePusher = jpush.NewClient() } return offlinePusher @@ -164,8 +163,8 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws return err } log.ZInfo(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(pushToUserIDs), "list", pushToUserIDs) - if len(config.Config.Manager.AppManagerUid) > 0 { - ctx = mcontext.WithOpUserIDContext(ctx, config.Config.Manager.AppManagerUid[0]) + if len(config.Config.Manager.UserID) > 0 { + ctx = mcontext.WithOpUserIDContext(ctx, config.Config.Manager.UserID[0]) } defer func(groupID string) { if err := p.groupRpcClient.DismissGroup(ctx, groupID); err != nil { diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index d0a6176f4..b70daec18 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -48,7 +48,7 @@ func (s *authServer) UserToken(ctx context.Context, req *pbAuth.UserTokenReq) (* return nil, err } resp.Token = token - resp.ExpireTimeSeconds = config.Config.TokenPolicy.AccessExpire + resp.ExpireTimeSeconds = config.Config.TokenPolicy.AccessExpire * 24 * 60 * 60 return &resp, nil } diff --git a/internal/rpc/msg/verify.go b/internal/rpc/msg/verify.go index d0ebc7baa..9d5d05efd 100644 --- a/internal/rpc/msg/verify.go +++ b/internal/rpc/msg/verify.go @@ -38,7 +38,7 @@ type MessageRevoked struct { func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgReq) error { switch data.MsgData.SessionType { case constant.SingleChatType: - if utils.IsContain(data.MsgData.SendID, config.Config.Manager.AppManagerUid) { + if utils.IsContain(data.MsgData.SendID, config.Config.Manager.UserID) { return nil } if data.MsgData.ContentType <= constant.NotificationEnd && data.MsgData.ContentType >= constant.NotificationBegin { @@ -73,7 +73,7 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe if groupInfo.GroupType == constant.SuperGroup { return nil } - if utils.IsContain(data.MsgData.SendID, config.Config.Manager.AppManagerUid) { + if utils.IsContain(data.MsgData.SendID, config.Config.Manager.UserID) { return nil } if data.MsgData.ContentType <= constant.NotificationEnd && data.MsgData.ContentType >= constant.NotificationBegin { diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index bc7be1f47..25720f938 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -45,10 +45,10 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error { return err } users := make([]*tablerelation.UserModel, 0) - if len(config.Config.Manager.AppManagerUid) != len(config.Config.Manager.Nickname) { + if len(config.Config.Manager.UserID) != len(config.Config.Manager.Nickname) { return errs.ErrConfig.Wrap("len(config.Config.Manager.AppManagerUid) != len(config.Config.Manager.Nickname)") } - for k, v := range config.Config.Manager.AppManagerUid { + for k, v := range config.Config.Manager.UserID { users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.Manager.Nickname[k]}) } userDB := relation.NewUserGorm(db) diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index c55a0d046..debf61a6d 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -11,8 +11,8 @@ import ( ) func StartCronTask() error { - log.ZInfo(context.Background(), "start cron task", "cron config", config.Config.Mongo.ChatRecordsClearTime) - fmt.Println("cron task start, config", config.Config.Mongo.ChatRecordsClearTime) + log.ZInfo(context.Background(), "start cron task", "cron config", config.Config.ChatRecordsClearTime) + fmt.Println("cron task start, config", config.Config.ChatRecordsClearTime) msgTool, err := InitMsgTool() if err != nil { return err @@ -20,9 +20,9 @@ func StartCronTask() error { c := cron.New() var wg sync.WaitGroup wg.Add(1) - _, err = c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, msgTool.AllConversationClearMsgAndFixSeq) + _, err = c.AddFunc(config.Config.ChatRecordsClearTime, msgTool.AllConversationClearMsgAndFixSeq) if err != nil { - fmt.Println("start cron failed", err.Error(), config.Config.Mongo.ChatRecordsClearTime) + fmt.Println("start cron failed", err.Error(), config.Config.ChatRecordsClearTime) return err } c.Start() diff --git a/internal/tools/msg.go b/internal/tools/msg.go index 2b47560e5..fc810aa95 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -74,8 +74,8 @@ func (c *MsgTool) AllConversationClearMsgAndFixSeq() { func (c *MsgTool) ClearConversationsMsg(ctx context.Context, conversationIDs []string) { for _, conversationID := range conversationIDs { - if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil { - log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "conversationID", conversationID, "DBRetainChatRecords", config.Config.Mongo.DBRetainChatRecords) + if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, int64(config.Config.RetainChatRecords*24*60*60)); err != nil { + log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "conversationID", conversationID, "DBRetainChatRecords", config.Config.RetainChatRecords) } if err := c.checkMaxSeq(ctx, conversationID); err != nil { log.ZError(ctx, "fixSeq failed", err, "conversationID", conversationID) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index acfb09a01..053305f26 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -2,42 +2,17 @@ package config import ( _ "embed" - "fmt" - "io/ioutil" - "os" - "path/filepath" - "runtime" - - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" - "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" - "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" - - "gopkg.in/yaml.v3" ) //go:embed version var Version string -var ( - _, b, _, _ = runtime.Caller(0) - // Root folder of this project - Root = filepath.Join(filepath.Dir(b), "../../..") -) - -const ( - FileName = "config.yaml" - NotificationFileName = "notification.yaml" - ENV = "CONFIG_NAME" - DefaultFolderPath = "../config/" - ConfKey = "conf" -) - var Config config type CallBackConfig struct { Enable bool `yaml:"enable"` - CallbackTimeOut int `yaml:"callbackTimeOut"` - CallbackFailedContinue *bool `yaml:"callbackFailedContinue"` + CallbackTimeOut int `yaml:"timeout"` + CallbackFailedContinue *bool `yaml:"failedContinue"` } type NotificationConf struct { @@ -55,22 +30,74 @@ type POfflinePush struct { } type config struct { - ServerIP string `yaml:"serverip"` + Zookeeper struct { + Schema string `yaml:"schema"` + ZkAddr []string `yaml:"address"` + Username string `yaml:"username"` + Password string `yaml:"password"` + } `yaml:"zookeeper"` - RpcRegisterIP string `yaml:"rpcRegisterIP"` - ListenIP string `yaml:"listenIP"` + Mysql struct { + Address []string `yaml:"address"` + Username string `yaml:"username"` + Password string `yaml:"password"` + Database string `yaml:"database"` + MaxOpenConn int `yaml:"maxOpenConn"` + MaxIdleConn int `yaml:"maxIdleConn"` + MaxLifeTime int `yaml:"maxLifeTime"` + LogLevel int `yaml:"logLevel"` + SlowThreshold int `yaml:"slowThreshold"` + } `yaml:"mysql"` - ServerVersion string `yaml:"serverversion"` - Api struct { - GinPort []int `yaml:"openImApiPort"` - ListenIP string `yaml:"listenIP"` - } - Sdk struct { - WsPort []int `yaml:"openImSdkWsPort"` - DataDir []string `yaml:"dataDir"` - } - Credential struct { - } + Mongo struct { + Uri string `yaml:"uri"` + Address []string `yaml:"address"` + Database string `yaml:"database"` + Username string `yaml:"username"` + Password string `yaml:"password"` + MaxPoolSize int `yaml:"maxPoolSize"` + } `yaml:"mongo"` + + Redis struct { + Address []string `yaml:"address"` + Username string `yaml:"username"` + Password string `yaml:"password"` + } `yaml:"redis"` + + Kafka struct { + Username string `yaml:"username"` + Password string `yaml:"password"` + Addr []string `yaml:"addr"` + LatestMsgToRedis struct { + Topic string `yaml:"topic"` + } `yaml:"latestMsgToRedis"` + MsgToMongo struct { + Topic string `yaml:"topic"` + } `yaml:"offlineMsgToMongo"` + MsgToPush struct { + Topic string `yaml:"topic"` + } `yaml:"msgToPush"` + MsgToModify struct { + Topic string `yaml:"topic"` + } `yaml:"msgToModify"` + ConsumerGroupID struct { + MsgToRedis string `yaml:"msgToRedis"` + MsgToMongo string `yaml:"msgToMongo"` + MsgToMySql string `yaml:"msgToMySql"` + MsgToPush string `yaml:"msgToPush"` + MsgToModify string `yaml:"msgToModify"` + } `yaml:"consumerGroupID"` + } `yaml:"kafka"` + + Rpc struct { + RegisterIP string `yaml:"registerIP"` + ListenIP string `yaml:"listenIP"` + } `yaml:"rpc"` + + Api struct { + OpenImApiPort []int `yaml:"openImApiPort"` + ListenIP string `yaml:"listenIP"` + } `yaml:"api"` Object struct { Enable string `yaml:"enable"` @@ -90,7 +117,7 @@ type config struct { Bucket string `yaml:"bucket"` SecretID string `yaml:"secretID"` SecretKey string `yaml:"secretKey"` - } + } `yaml:"tencent"` Ali struct { RegionID string `yaml:"regionID"` AccessKeyID string `yaml:"accessKeyID"` @@ -101,7 +128,7 @@ type config struct { FinalHost string `yaml:"finalHost"` StsDurationSeconds int64 `yaml:"stsDurationSeconds"` OssRoleArn string `yaml:"OssRoleArn"` - } + } `yaml:"ali"` Aws struct { AccessKeyID string `yaml:"accessKeyID"` AccessKeySecret string `yaml:"accessKeySecret"` @@ -112,43 +139,8 @@ type config struct { ExternalId string `yaml:"externalId"` RoleSessionName string `yaml:"roleSessionName"` } `yaml:"aws"` - } + } `yaml:"object"` - Mysql struct { - DBAddress []string `yaml:"dbMysqlAddress"` - DBUserName string `yaml:"dbMysqlUserName"` - DBPassword string `yaml:"dbMysqlPassword"` - DBDatabaseName string `yaml:"dbMysqlDatabaseName"` - DBTableName string `yaml:"DBTableName"` - DBMsgTableNum int `yaml:"dbMsgTableNum"` - DBMaxOpenConns int `yaml:"dbMaxOpenConns"` - DBMaxIdleConns int `yaml:"dbMaxIdleConns"` - DBMaxLifeTime int `yaml:"dbMaxLifeTime"` - LogLevel int `yaml:"logLevel"` - SlowThreshold int `yaml:"slowThreshold"` - } - Mongo struct { - DBUri string `yaml:"dbUri"` - DBAddress []string `yaml:"dbAddress"` - DBDirect bool `yaml:"dbDirect"` - DBTimeout int `yaml:"dbTimeout"` - DBDatabase string `yaml:"dbDatabase"` - DBSource string `yaml:"dbSource"` - DBUserName string `yaml:"dbUserName"` - DBPassword string `yaml:"dbPassword"` - DBMaxPoolSize int `yaml:"dbMaxPoolSize"` - DBRetainChatRecords int `yaml:"dbRetainChatRecords"` - ChatRecordsClearTime string `yaml:"chatRecordsClearTime"` - } - Redis struct { - DBAddress []string `yaml:"dbAddress"` - DBMaxIdle int `yaml:"dbMaxIdle"` - DBMaxActive int `yaml:"dbMaxActive"` - DBIdleTimeout int `yaml:"dbIdleTimeout"` - DBUserName string `yaml:"dbUserName"` - DBPassWord string `yaml:"dbPassWord"` - EnableCluster bool `yaml:"enableCluster"` - } RpcPort struct { OpenImUserPort []int `yaml:"openImUserPort"` OpenImFriendPort []int `yaml:"openImFriendPort"` @@ -158,10 +150,10 @@ type config struct { OpenImAuthPort []int `yaml:"openImAuthPort"` OpenImPushPort []int `yaml:"openImPushPort"` OpenImConversationPort []int `yaml:"openImConversationPort"` - OpenImCachePort []int `yaml:"openImCachePort"` OpenImRtcPort []int `yaml:"openImRtcPort"` OpenImThirdPort []int `yaml:"openImThirdPort"` - } + } `yaml:"rpcPort"` + RpcRegisterName struct { OpenImUserName string `yaml:"openImUserName"` OpenImFriendName string `yaml:"openImFriendName"` @@ -171,128 +163,89 @@ type config struct { OpenImGroupName string `yaml:"openImGroupName"` OpenImAuthName string `yaml:"openImAuthName"` OpenImConversationName string `yaml:"openImConversationName"` - OpenImRtcName string `yaml:"openImRtcName"` OpenImThirdName string `yaml:"openImThirdName"` - } - Zookeeper struct { - Schema string `yaml:"schema"` - ZkAddr []string `yaml:"zkAddr"` - UserName string `yaml:"userName"` - Password string `yaml:"password"` - } `yaml:"zookeeper"` + } `yaml:"rpcRegisterName"` + Log struct { StorageLocation string `yaml:"storageLocation"` RotationTime int `yaml:"rotationTime"` RemainRotationCount uint `yaml:"remainRotationCount"` RemainLogLevel int `yaml:"remainLogLevel"` IsStdout bool `yaml:"isStdout"` - WithStack bool `yaml:"withStack"` IsJson bool `yaml:"isJson"` - } - ModuleName struct { - LongConnSvrName string `yaml:"longConnSvrName"` - MsgTransferName string `yaml:"msgTransferName"` - PushName string `yaml:"pushName"` - } + WithStack bool `yaml:"withStack"` + } `yaml:"log"` + LongConnSvr struct { - WebsocketPort []int `yaml:"openImWsPort"` + OpenImWsPort []int `yaml:"openImWsPort"` WebsocketMaxConnNum int `yaml:"websocketMaxConnNum"` WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"` - WebsocketTimeOut int `yaml:"websocketTimeOut"` - } + WebsocketTimeout int `yaml:"websocketTimeout"` + } `yaml:"longConnSvr"` Push struct { - Jpns struct { - AppKey string `yaml:"appKey"` - MasterSecret string `yaml:"masterSecret"` - PushUrl string `yaml:"pushUrl"` - PushIntent string `yaml:"pushIntent"` - Enable bool `yaml:"enable"` - } - Getui struct { + Enable string `yaml:"enable"` + GeTui struct { PushUrl string `yaml:"pushUrl"` AppKey string `yaml:"appKey"` - Enable bool `yaml:"enable"` Intent string `yaml:"intent"` MasterSecret string `yaml:"masterSecret"` ChannelID string `yaml:"channelID"` ChannelName string `yaml:"channelName"` - } + } `yaml:"geTui"` Fcm struct { ServiceAccount string `yaml:"serviceAccount"` - Enable bool `yaml:"enable"` - } + } `yaml:"fcm"` + Jpns struct { + AppKey string `yaml:"appKey"` + MasterSecret string `yaml:"masterSecret"` + PushUrl string `yaml:"pushUrl"` + PushIntent string `yaml:"pushIntent"` + } `yaml:"jpns"` } Manager struct { - AppManagerUid []string `yaml:"appManagerUid"` - Nickname []string `yaml:"nickname"` - } + UserID []string `yaml:"userID"` + Nickname []string `yaml:"nickname"` + } `yaml:"manager"` - Kafka struct { - SASLUserName string `yaml:"SASLUserName"` - SASLPassword string `yaml:"SASLPassword"` - Ws2mschat struct { - Addr []string `yaml:"addr"` - Topic string `yaml:"topic"` - } - MsgToMongo struct { - Addr []string `yaml:"addr"` - Topic string `yaml:"topic"` - } - Ms2pschat struct { - Addr []string `yaml:"addr"` - Topic string `yaml:"topic"` - } - MsgToModify struct { - Addr []string `yaml:"addr"` - Topic string `yaml:"topic"` - } - ConsumerGroupID struct { - MsgToRedis string `yaml:"msgToTransfer"` - MsgToMongo string `yaml:"msgToMongo"` - MsgToMySql string `yaml:"msgToMySql"` - MsgToPush string `yaml:"msgToPush"` - MsgToModify string `yaml:"msgToModify"` - } - } - Secret string `yaml:"secret"` - MultiLoginPolicy int `yaml:"multiloginpolicy"` - ChatPersistenceMysql bool `yaml:"chatpersistencemysql"` + MultiLoginPolicy int `yaml:"multiLoginPolicy"` + ChatPersistenceMysql bool `yaml:"chatPersistenceMysql"` MsgCacheTimeout int `yaml:"msgCacheTimeout"` GroupMessageHasReadReceiptEnable bool `yaml:"groupMessageHasReadReceiptEnable"` SingleMessageHasReadReceiptEnable bool `yaml:"singleMessageHasReadReceiptEnable"` - - TokenPolicy struct { + RetainChatRecords int `yaml:"retainChatRecords"` + ChatRecordsClearTime string `yaml:"chatRecordsClearTime"` + TokenPolicy struct { AccessSecret string `yaml:"accessSecret"` AccessExpire int64 `yaml:"accessExpire"` - } + } `yaml:"tokenPolicy"` MessageVerify struct { FriendVerify *bool `yaml:"friendVerify"` - } + } `yaml:"messageVerify"` + IOSPush struct { PushSound string `yaml:"pushSound"` BadgeCount bool `yaml:"badgeCount"` Production bool `yaml:"production"` - } + } `yaml:"iosPush"` Callback struct { - CallbackUrl string `yaml:"callbackUrl"` - CallbackBeforeSendSingleMsg CallBackConfig `yaml:"callbackBeforeSendSingleMsg"` - CallbackAfterSendSingleMsg CallBackConfig `yaml:"callbackAfterSendSingleMsg"` - CallbackBeforeSendGroupMsg CallBackConfig `yaml:"callbackBeforeSendGroupMsg"` - CallbackAfterSendGroupMsg CallBackConfig `yaml:"callbackAfterSendGroupMsg"` - CallbackMsgModify CallBackConfig `yaml:"callbackMsgModify"` - CallbackUserOnline CallBackConfig `yaml:"callbackUserOnline"` - CallbackUserOffline CallBackConfig `yaml:"callbackUserOffline"` - CallbackUserKickOff CallBackConfig `yaml:"callbackUserKickOff"` - CallbackOfflinePush CallBackConfig `yaml:"callbackOfflinePush"` - CallbackOnlinePush CallBackConfig `yaml:"callbackOnlinePush"` - CallbackBeforeSuperGroupOnlinePush CallBackConfig `yaml:"callbackSuperGroupOnlinePush"` - CallbackBeforeAddFriend CallBackConfig `yaml:"callbackBeforeAddFriend"` - CallbackBeforeCreateGroup CallBackConfig `yaml:"callbackBeforeCreateGroup"` - CallbackBeforeMemberJoinGroup CallBackConfig `yaml:"callbackBeforeMemberJoinGroup"` - CallbackBeforeSetGroupMemberInfo CallBackConfig `yaml:"callbackBeforeSetGroupMemberInfo"` + CallbackUrl string `yaml:"url"` + CallbackBeforeSendSingleMsg CallBackConfig `yaml:"beforeSendSingleMsg"` + CallbackAfterSendSingleMsg CallBackConfig `yaml:"afterSendSingleMsg"` + CallbackBeforeSendGroupMsg CallBackConfig `yaml:"beforeSendGroupMsg"` + CallbackAfterSendGroupMsg CallBackConfig `yaml:"afterSendGroupMsg"` + CallbackMsgModify CallBackConfig `yaml:"msgModify"` + CallbackUserOnline CallBackConfig `yaml:"userOnline"` + CallbackUserOffline CallBackConfig `yaml:"userOffline"` + CallbackUserKickOff CallBackConfig `yaml:"userKickOff"` + CallbackOfflinePush CallBackConfig `yaml:"offlinePush"` + CallbackOnlinePush CallBackConfig `yaml:"onlinePush"` + CallbackBeforeSuperGroupOnlinePush CallBackConfig `yaml:"superGroupOnlinePush"` + CallbackBeforeAddFriend CallBackConfig `yaml:"beforeAddFriend"` + CallbackBeforeCreateGroup CallBackConfig `yaml:"beforeCreateGroup"` + CallbackBeforeMemberJoinGroup CallBackConfig `yaml:"beforeMemberJoinGroup"` + CallbackBeforeSetGroupMemberInfo CallBackConfig `yaml:"beforeSetGroupMemberInfo"` } `yaml:"callback"` - Notification Notification `yaml:"notification"` Prometheus struct { Enable bool `yaml:"enable"` @@ -308,9 +261,10 @@ type config struct { MessageTransferPrometheusPort []int `yaml:"messageTransferPrometheusPort"` ThirdPrometheusPort []int `yaml:"thirdPrometheusPort"` } `yaml:"prometheus"` + Notification notification `yaml:"notification"` } -type Notification struct { +type notification struct { GroupCreated NotificationConf `yaml:"groupCreated"` GroupInfoSet NotificationConf `yaml:"groupInfoSet"` JoinGroupApplication NotificationConf `yaml:"joinGroupApplication"` @@ -347,82 +301,3 @@ type Notification struct { ConversationChanged NotificationConf `yaml:"conversationChanged"` ConversationSetPrivate NotificationConf `yaml:"conversationSetPrivate"` } - -func GetServiceNames() []string { - return []string{Config.RpcRegisterName.OpenImUserName, Config.RpcRegisterName.OpenImFriendName, Config.RpcRegisterName.OpenImMsgName, - Config.RpcRegisterName.OpenImPushName, Config.RpcRegisterName.OpenImMessageGatewayName, Config.RpcRegisterName.OpenImGroupName, - Config.RpcRegisterName.OpenImAuthName, Config.RpcRegisterName.OpenImConversationName, Config.RpcRegisterName.OpenImRtcName, - Config.RpcRegisterName.OpenImThirdName} -} - -func GetOptionsByNotification(cfg NotificationConf) utils.Options { - opts := utils.NewOptions() - if cfg.UnreadCount { - opts = utils.WithOptions(opts, utils.WithUnreadCount(true)) - } - if cfg.OfflinePush.Enable { - opts = utils.WithOptions(opts, utils.WithOfflinePush(true)) - } - switch cfg.ReliabilityLevel { - case constant.UnreliableNotification: - case constant.ReliableNotificationNoMsg: - opts = utils.WithOptions(opts, utils.WithHistory(true), utils.WithPersistent()) - } - opts = utils.WithOptions(opts, utils.WithSendMsg(cfg.IsSendMsg)) - return opts -} - -func (c *config) unmarshalConfig(config interface{}, configPath string) error { - bytes, err := ioutil.ReadFile(configPath) - if err != nil { - return err - } - if err = yaml.Unmarshal(bytes, config); err != nil { - return err - } - return nil -} - -func (c *config) initConfig(config interface{}, configName, configFolderPath string) error { - if configFolderPath == "" { - configFolderPath = DefaultFolderPath - } - configPath := filepath.Join(configFolderPath, configName) - defer func() { - fmt.Println("use config", configPath) - }() - _, err := os.Stat(configPath) - if err != nil { - if !os.IsNotExist(err) { - return err - } - configPath = filepath.Join(Root, "config", configName) - } else { - Root = filepath.Dir(configPath) - } - return c.unmarshalConfig(config, configPath) -} - -func (c *config) RegisterConf2Registry(registry discoveryregistry.SvcDiscoveryRegistry) error { - bytes, err := yaml.Marshal(Config) - if err != nil { - return err - } - return registry.RegisterConf2Registry(ConfKey, bytes) -} - -func (c *config) GetConfFromRegistry(registry discoveryregistry.SvcDiscoveryRegistry) ([]byte, error) { - return registry.GetConfFromRegistry(ConfKey) -} - -func InitConfig(configFolderPath string) error { - err := Config.initConfig(&Config, FileName, configFolderPath) - if err != nil { - return err - } - err = Config.initConfig(&Config.Notification, NotificationFileName, configFolderPath) - if err != nil { - return err - } - return nil -} diff --git a/pkg/common/config/parse.go b/pkg/common/config/parse.go new file mode 100644 index 000000000..8ab68b68f --- /dev/null +++ b/pkg/common/config/parse.go @@ -0,0 +1,107 @@ +package config + +import ( + "bytes" + "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" + "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" + "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" + "gopkg.in/yaml.v3" + "os" + "path/filepath" + "runtime" +) + +var ( + _, b, _, _ = runtime.Caller(0) + // Root folder of this project + Root = filepath.Join(filepath.Dir(b), "../../..") +) + +const ( + FileName = "config.yaml" + NotificationFileName = "notification.yaml" + ENV = "CONFIG_NAME" + DefaultFolderPath = "../config/" + ConfKey = "conf" +) + +func GetOptionsByNotification(cfg NotificationConf) utils.Options { + opts := utils.NewOptions() + if cfg.UnreadCount { + opts = utils.WithOptions(opts, utils.WithUnreadCount(true)) + } + if cfg.OfflinePush.Enable { + opts = utils.WithOptions(opts, utils.WithOfflinePush(true)) + } + switch cfg.ReliabilityLevel { + case constant.UnreliableNotification: + case constant.ReliableNotificationNoMsg: + opts = utils.WithOptions(opts, utils.WithHistory(true), utils.WithPersistent()) + } + opts = utils.WithOptions(opts, utils.WithSendMsg(cfg.IsSendMsg)) + return opts +} + +func (c *config) unmarshalConfig(config interface{}, configPath string) error { + bytes, err := os.ReadFile(configPath) + if err != nil { + return err + } + if err = yaml.Unmarshal(bytes, config); err != nil { + return err + } + return nil +} + +func (c *config) initConfig(config interface{}, configName, configFolderPath string) error { + if configFolderPath == "" { + configFolderPath = DefaultFolderPath + } + configPath := filepath.Join(configFolderPath, configName) + defer func() { + fmt.Println("use config", configPath) + }() + _, err := os.Stat(configPath) + if err != nil { + if !os.IsNotExist(err) { + return err + } + configPath = filepath.Join(Root, "config", configName) + } else { + Root = filepath.Dir(configPath) + } + return c.unmarshalConfig(config, configPath) +} + +func (c *config) RegisterConf2Registry(registry discoveryregistry.SvcDiscoveryRegistry) error { + bytes, err := yaml.Marshal(Config) + if err != nil { + return err + } + return registry.RegisterConf2Registry(ConfKey, bytes) +} + +func (c *config) GetConfFromRegistry(registry discoveryregistry.SvcDiscoveryRegistry) ([]byte, error) { + return registry.GetConfFromRegistry(ConfKey) +} + +func InitConfig(configFolderPath string) error { + err := Config.initConfig(&Config, FileName, configFolderPath) + if err != nil { + return err + } + err = Config.initConfig(&Config.Notification, NotificationFileName, configFolderPath) + if err != nil { + return err + } + return nil +} + +func EncodeConfig() []byte { + buf := bytes.NewBuffer(nil) + if err := yaml.NewEncoder(buf).Encode(Config); err != nil { + panic(err) + } + return buf.Bytes() +} diff --git a/pkg/common/db/cache/init_redis.go b/pkg/common/db/cache/init_redis.go index 47dcc2adf..c0277c445 100644 --- a/pkg/common/db/cache/init_redis.go +++ b/pkg/common/db/cache/init_redis.go @@ -2,6 +2,7 @@ package cache import ( "context" + "errors" "fmt" "time" @@ -12,22 +13,25 @@ import ( ) func NewRedis() (redis.UniversalClient, error) { + if len(config.Config.Redis.Address) == 0 { + return nil, errors.New("redis address is empty") + } specialerror.AddReplace(redis.Nil, errs.ErrRecordNotFound) var rdb redis.UniversalClient - if config.Config.Redis.EnableCluster { + if len(config.Config.Redis.Address) > 1 { rdb = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: config.Config.Redis.DBAddress, - Username: config.Config.Redis.DBUserName, - Password: config.Config.Redis.DBPassWord, // no password set + Addrs: config.Config.Redis.Address, + Username: config.Config.Redis.Username, + Password: config.Config.Redis.Password, // no password set PoolSize: 50, }) } else { rdb = redis.NewClient(&redis.Options{ - Addr: config.Config.Redis.DBAddress[0], - Username: config.Config.Redis.DBUserName, - Password: config.Config.Redis.DBPassWord, // no password set - DB: 0, // use default DB - PoolSize: 100, // 连接池大小 + Addr: config.Config.Redis.Address[0], + Username: config.Config.Redis.Username, + Password: config.Config.Redis.Password, // no password set + DB: 0, // use default DB + PoolSize: 100, // 连接池大小 }) } ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 09592d650..e732f1f08 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -2,6 +2,7 @@ package controller import ( "fmt" + "github.com/redis/go-redis/v9" "time" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" @@ -20,7 +21,6 @@ import ( pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" - "github.com/redis/go-redis/v9" "go.mongodb.org/mongo-driver/mongo" ) @@ -98,10 +98,10 @@ func NewCommonMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheMo return &commonMsgDatabase{ msgDocDatabase: msgDocModel, cache: cacheModel, - producer: kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic), - producerToMongo: kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic), - producerToPush: kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic), - producerToModify: kafka.NewKafkaProducer(config.Config.Kafka.MsgToModify.Addr, config.Config.Kafka.MsgToModify.Topic), + producer: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.LatestMsgToRedis.Topic), + producerToMongo: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToMongo.Topic), + producerToPush: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToPush.Topic), + producerToModify: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToModify.Topic), } } diff --git a/pkg/common/db/controller/msg_test.go b/pkg/common/db/controller/msg_test.go index 2ccefab19..e5bfe2e0d 100644 --- a/pkg/common/db/controller/msg_test.go +++ b/pkg/common/db/controller/msg_test.go @@ -18,15 +18,15 @@ import ( ) func Test_BatchInsertChat2DB(t *testing.T) { - config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"} - config.Config.Mongo.DBTimeout = 60 - config.Config.Mongo.DBDatabase = "openIM" - config.Config.Mongo.DBSource = "admin" - config.Config.Mongo.DBUserName = "root" - config.Config.Mongo.DBPassword = "openIM123" - config.Config.Mongo.DBMaxPoolSize = 100 - config.Config.Mongo.DBRetainChatRecords = 3650 - config.Config.Mongo.ChatRecordsClearTime = "0 2 * * 3" + config.Config.Mongo.Address = []string{"192.168.44.128:37017"} + config.Config.Mongo.Timeout = 60 + config.Config.Mongo.Database = "openIM" + config.Config.Mongo.Source = "admin" + config.Config.Mongo.Username = "root" + config.Config.Mongo.Password = "openIM123" + config.Config.Mongo.MaxPoolSize = 100 + config.Config.RetainChatRecords = 3650 + config.Config.ChatRecordsClearTime = "0 2 * * 3" mongo, err := unrelation.NewMongo() if err != nil { @@ -129,15 +129,15 @@ func Test_BatchInsertChat2DB(t *testing.T) { } func GetDB() *commonMsgDatabase { - config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"} - config.Config.Mongo.DBTimeout = 60 - config.Config.Mongo.DBDatabase = "openIM" - config.Config.Mongo.DBSource = "admin" - config.Config.Mongo.DBUserName = "root" - config.Config.Mongo.DBPassword = "openIM123" - config.Config.Mongo.DBMaxPoolSize = 100 - config.Config.Mongo.DBRetainChatRecords = 3650 - config.Config.Mongo.ChatRecordsClearTime = "0 2 * * 3" + config.Config.Mongo.Address = []string{"192.168.44.128:37017"} + config.Config.Mongo.Timeout = 60 + config.Config.Mongo.Database = "openIM" + config.Config.Mongo.Source = "admin" + config.Config.Mongo.Username = "root" + config.Config.Mongo.Password = "openIM123" + config.Config.Mongo.MaxPoolSize = 100 + config.Config.RetainChatRecords = 3650 + config.Config.ChatRecordsClearTime = "0 2 * * 3" mongo, err := unrelation.NewMongo() if err != nil { diff --git a/pkg/common/db/relation/mysql_init.go b/pkg/common/db/relation/mysql_init.go index 43d07b646..b63bada05 100644 --- a/pkg/common/db/relation/mysql_init.go +++ b/pkg/common/db/relation/mysql_init.go @@ -16,7 +16,7 @@ import ( func newMysqlGormDB() (*gorm.DB, error) { dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local", - config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, config.Config.Mysql.DBAddress[0], "mysql") + config.Config.Mysql.Username, config.Config.Mysql.Password, config.Config.Mysql.Address[0], "mysql") db, err := gorm.Open(mysql.Open(dsn), nil) if err != nil { time.Sleep(time.Duration(30) * time.Second) @@ -30,13 +30,13 @@ func newMysqlGormDB() (*gorm.DB, error) { return nil, err } defer sqlDB.Close() - sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s default charset utf8mb4 COLLATE utf8mb4_unicode_ci;", config.Config.Mysql.DBDatabaseName) + sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s default charset utf8mb4 COLLATE utf8mb4_unicode_ci;", config.Config.Mysql.Database) err = db.Exec(sql).Error if err != nil { return nil, fmt.Errorf("init db %w", err) } dsn = fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local", - config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, config.Config.Mysql.DBAddress[0], config.Config.Mysql.DBDatabaseName) + config.Config.Mysql.Username, config.Config.Mysql.Password, config.Config.Mysql.Address[0], config.Config.Mysql.Database) sqlLogger := log.NewSqlLogger(logger.LogLevel(config.Config.Mysql.LogLevel), true, time.Duration(config.Config.Mysql.SlowThreshold)*time.Millisecond) db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{ Logger: sqlLogger, @@ -48,9 +48,9 @@ func newMysqlGormDB() (*gorm.DB, error) { if err != nil { return nil, err } - sqlDB.SetConnMaxLifetime(time.Second * time.Duration(config.Config.Mysql.DBMaxLifeTime)) - sqlDB.SetMaxOpenConns(config.Config.Mysql.DBMaxOpenConns) - sqlDB.SetMaxIdleConns(config.Config.Mysql.DBMaxIdleConns) + sqlDB.SetConnMaxLifetime(time.Second * time.Duration(config.Config.Mysql.MaxLifeTime)) + sqlDB.SetMaxOpenConns(config.Config.Mysql.MaxOpenConn) + sqlDB.SetMaxIdleConns(config.Config.Mysql.MaxIdleConn) return db, nil } diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go index 6f384417b..6ca460ef8 100644 --- a/pkg/common/db/unrelation/mongo.go +++ b/pkg/common/db/unrelation/mongo.go @@ -23,27 +23,27 @@ type Mongo struct { func NewMongo() (*Mongo, error) { specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound) uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority" - if config.Config.Mongo.DBUri != "" { + if config.Config.Mongo.Uri != "" { // example: mongodb://$user:$password@mongo1.mongo:27017,mongo2.mongo:27017,mongo3.mongo:27017/$DBDatabase/?replicaSet=rs0&readPreference=secondary&authSource=admin&maxPoolSize=$DBMaxPoolSize - uri = config.Config.Mongo.DBUri + uri = config.Config.Mongo.Uri } else { //mongodb://mongodb1.example.com:27317,mongodb2.example.com:27017/?replicaSet=mySet&authSource=authDB mongodbHosts := "" - for i, v := range config.Config.Mongo.DBAddress { - if i == len(config.Config.Mongo.DBAddress)-1 { + for i, v := range config.Config.Mongo.Address { + if i == len(config.Config.Mongo.Address)-1 { mongodbHosts += v } else { mongodbHosts += v + "," } } - if config.Config.Mongo.DBPassword != "" && config.Config.Mongo.DBUserName != "" { + if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" { uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin", - config.Config.Mongo.DBUserName, config.Config.Mongo.DBPassword, mongodbHosts, - config.Config.Mongo.DBDatabase, config.Config.Mongo.DBMaxPoolSize) + config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts, + config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize) } else { uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin", - mongodbHosts, config.Config.Mongo.DBDatabase, - config.Config.Mongo.DBMaxPoolSize) + mongodbHosts, config.Config.Mongo.Database, + config.Config.Mongo.MaxPoolSize) } } fmt.Println("mongo:", uri) @@ -61,7 +61,7 @@ func (m *Mongo) GetClient() *mongo.Client { } func (m *Mongo) GetDatabase() *mongo.Database { - return m.db.Database(config.Config.Mongo.DBDatabase) + return m.db.Database(config.Config.Mongo.Database) } func (m *Mongo) CreateMsgIndex() error { @@ -83,7 +83,7 @@ func (m *Mongo) CreateExtendMsgSetIndex() error { } func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...string) error { - db := m.db.Database(config.Config.Mongo.DBDatabase).Collection(collection) + db := m.db.Database(config.Config.Mongo.Database).Collection(collection) opts := options.CreateIndexes().SetMaxTime(10 * time.Second) indexView := db.Indexes() keysDoc := bsonx.Doc{} diff --git a/pkg/common/kafka/consumer.go b/pkg/common/kafka/consumer.go index 4b2039985..8a21a14d6 100644 --- a/pkg/common/kafka/consumer.go +++ b/pkg/common/kafka/consumer.go @@ -21,10 +21,10 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer { p.Topic = topic p.addr = addr consumerConfig := sarama.NewConfig() - if config.Config.Kafka.SASLUserName != "" && config.Config.Kafka.SASLPassword != "" { + if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" { consumerConfig.Net.SASL.Enable = true - consumerConfig.Net.SASL.User = config.Config.Kafka.SASLUserName - consumerConfig.Net.SASL.Password = config.Config.Kafka.SASLPassword + consumerConfig.Net.SASL.User = config.Config.Kafka.Username + consumerConfig.Net.SASL.Password = config.Config.Kafka.Password } consumer, err := sarama.NewConsumer(p.addr, consumerConfig) if err != nil { diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index f8e87883c..6e5868f24 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -32,10 +32,10 @@ func NewKafkaProducer(addr []string, topic string) *Producer { p.config.Producer.Return.Errors = true p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly - if config.Config.Kafka.SASLUserName != "" && config.Config.Kafka.SASLPassword != "" { + if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" { p.config.Net.SASL.Enable = true - p.config.Net.SASL.User = config.Config.Kafka.SASLUserName - p.config.Net.SASL.Password = config.Config.Kafka.SASLPassword + p.config.Net.SASL.User = config.Config.Kafka.Username + p.config.Net.SASL.Password = config.Config.Kafka.Password } p.addr = addr p.topic = topic diff --git a/pkg/common/mw/check.go b/pkg/common/mw/check.go index 5c8a3ee0c..b1f375d08 100644 --- a/pkg/common/mw/check.go +++ b/pkg/common/mw/check.go @@ -26,7 +26,7 @@ func init() { func initAesKey() { once.Do(func() { - key := md5.Sum([]byte("openim:" + config.Config.Secret)) + key := md5.Sum([]byte("openim:" + config.Config.TokenPolicy.AccessSecret)) var err error block, err = aes.NewCipher(key[:]) if err != nil { diff --git a/pkg/common/mw/check_test.go b/pkg/common/mw/check_test.go index a70903810..88845af1b 100644 --- a/pkg/common/mw/check_test.go +++ b/pkg/common/mw/check_test.go @@ -7,7 +7,7 @@ import ( ) func TestCheck(t *testing.T) { - config.Config.Secret = "123456" + config.Config.TokenPolicy.Secret = "123456" args := []string{"1", "2", "3"} diff --git a/pkg/common/tokenverify/jwt_token.go b/pkg/common/tokenverify/jwt_token.go index bc7ca62e6..862d238cc 100644 --- a/pkg/common/tokenverify/jwt_token.go +++ b/pkg/common/tokenverify/jwt_token.go @@ -62,7 +62,7 @@ func GetClaimFromToken(tokensString string) (*Claims, error) { func CheckAccessV3(ctx context.Context, ownerUserID string) (err error) { opUserID := mcontext.GetOpUserID(ctx) - if utils.IsContain(opUserID, config.Config.Manager.AppManagerUid) { + if utils.IsContain(opUserID, config.Config.Manager.UserID) { return nil } if opUserID == ownerUserID { @@ -72,11 +72,11 @@ func CheckAccessV3(ctx context.Context, ownerUserID string) (err error) { } func IsAppManagerUid(ctx context.Context) bool { - return utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.AppManagerUid) + return utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.UserID) } func CheckAdmin(ctx context.Context) error { - if utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.AppManagerUid) { + if utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.UserID) { return nil } return errs.ErrIdentity.Wrap(fmt.Sprintf("user %s is not admin userID", mcontext.GetOpUserID(ctx))) @@ -86,7 +86,7 @@ func ParseRedisInterfaceToken(redisToken interface{}) (*Claims, error) { return GetClaimFromToken(string(redisToken.([]uint8))) } func IsManagerUserID(opUserID string) bool { - return utils.IsContain(opUserID, config.Config.Manager.AppManagerUid) + return utils.IsContain(opUserID, config.Config.Manager.UserID) } func WsVerifyToken(token, userID string, platformID int) error { claim, err := GetClaimFromToken(token) diff --git a/pkg/startrpc/start.go b/pkg/startrpc/start.go index d938aaada..c7490f4e0 100644 --- a/pkg/startrpc/start.go +++ b/pkg/startrpc/start.go @@ -21,23 +21,20 @@ import ( func Start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { fmt.Println("start", rpcRegisterName, "server, port: ", rpcPort, "prometheusPort:", prometheusPort, ", OpenIM version: ", config.Version) - listener, err := net.Listen("tcp", net.JoinHostPort(config.Config.ListenIP, strconv.Itoa(rpcPort))) + listener, err := net.Listen("tcp", net.JoinHostPort(network.GetListenIP(config.Config.Rpc.ListenIP), strconv.Itoa(rpcPort))) if err != nil { return err } defer listener.Close() - client, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, - openKeeper.WithFreq(time.Hour), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.UserName, + zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, + openKeeper.WithFreq(time.Hour), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username, config.Config.Zookeeper.Password), openKeeper.WithRoundRobin(), openKeeper.WithTimeout(10), openKeeper.WithLogger(log.NewZkLogger())) if err != nil { return utils.Wrap1(err) } - if client.CreateRpcRootNodes(config.GetServiceNames()); err != nil { - return err - } - defer client.CloseZK() - client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) - registerIP, err := network.GetRpcRegisterIP(config.Config.RpcRegisterIP) + defer zkClient.CloseZK() + zkClient.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) + registerIP, err := network.GetRpcRegisterIP(config.Config.Rpc.RegisterIP) if err != nil { return err } @@ -56,11 +53,11 @@ func Start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(c } srv := grpc.NewServer(options...) defer srv.GracefulStop() - err = rpcFn(client, srv) + err = rpcFn(zkClient, srv) if err != nil { return utils.Wrap1(err) } - err = client.Register(rpcRegisterName, registerIP, rpcPort, grpc.WithTransportCredentials(insecure.NewCredentials())) + err = zkClient.Register(rpcRegisterName, registerIP, rpcPort, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return utils.Wrap1(err) }