refactor: extract nested structures in the config.

pull/2100/head
Gordon 2 years ago
parent a2ff3de7e4
commit 415793bcbf

@ -51,7 +51,6 @@ type MsgTransfer struct {
// and handle the deletion notification message deleted subscriptions topic: msg_to_mongo
ctx context.Context
cancel context.CancelFunc
config *config.GlobalConfig
}
func StartTransfer(config *config.GlobalConfig, prometheusPort int) error {
@ -60,7 +59,7 @@ func StartTransfer(config *config.GlobalConfig, prometheusPort int) error {
return err
}
mongo, err := unrelation.NewMongo(config)
mongo, err := unrelation.NewMongo(&config.Mongo)
if err != nil {
return err
}
@ -78,15 +77,15 @@ func StartTransfer(config *config.GlobalConfig, prometheusPort int) error {
}
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
msgModel := cache.NewMsgCacheModel(rdb, config)
msgModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis)
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase(config.Mongo.Database))
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, config)
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, &config.Kafka)
if err != nil {
return err
}
conversationRpcClient := rpcclient.NewConversationRpcClient(client, config)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config)
msgTransfer, err := NewMsgTransfer(config, msgDatabase, &conversationRpcClient, &groupRpcClient)
conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.RpcRegisterName.OpenImConversationName)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName)
msgTransfer, err := NewMsgTransfer(&config.Kafka, msgDatabase, &conversationRpcClient, &groupRpcClient)
if err != nil {
return err
}
@ -94,16 +93,16 @@ func StartTransfer(config *config.GlobalConfig, prometheusPort int) error {
}
func NewMsgTransfer(
config *config.GlobalConfig,
kafkaConf *config.Kafka,
msgDatabase controller.CommonMsgDatabase,
conversationRpcClient *rpcclient.ConversationRpcClient,
groupRpcClient *rpcclient.GroupRpcClient,
) (*MsgTransfer, error) {
historyCH, err := NewOnlineHistoryRedisConsumerHandler(config, msgDatabase, conversationRpcClient, groupRpcClient)
historyCH, err := NewOnlineHistoryRedisConsumerHandler(kafkaConf, msgDatabase, conversationRpcClient, groupRpcClient)
if err != nil {
return nil, err
}
historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(config, msgDatabase)
historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(kafkaConf, msgDatabase)
if err != nil {
return nil, err
}
@ -111,7 +110,6 @@ func NewMsgTransfer(
return &MsgTransfer{
historyCH: historyCH,
historyMongoCH: historyMongoCH,
config: config,
}, nil
}
@ -136,7 +134,7 @@ func (m *MsgTransfer) Start(prometheusPort int, config *config.GlobalConfig) err
proreg.MustRegister(
collectors.NewGoCollector(),
)
proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer", config)...)
proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer", &config.RpcRegisterName)...)
http.Handle("/metrics", promhttp.HandlerFor(proreg, promhttp.HandlerOpts{Registry: proreg}))
err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil)
if err != nil && err != http.ErrServerClosed {

@ -82,7 +82,7 @@ type OnlineHistoryRedisConsumerHandler struct {
}
func NewOnlineHistoryRedisConsumerHandler(
config *config.GlobalConfig,
kafkaConf *config.Kafka,
database controller.CommonMsgDatabase,
conversationRpcClient *rpcclient.ConversationRpcClient,
groupRpcClient *rpcclient.GroupRpcClient,
@ -100,12 +100,12 @@ func NewOnlineHistoryRedisConsumerHandler(
var err error
var tlsConfig *kafka.TLSConfig
if config.Kafka.TLS != nil {
if kafkaConf.TLS != nil {
tlsConfig = &kafka.TLSConfig{
CACrt: config.Kafka.TLS.CACrt,
ClientCrt: config.Kafka.TLS.ClientCrt,
ClientKey: config.Kafka.TLS.ClientKey,
ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd,
CACrt: kafkaConf.TLS.CACrt,
ClientCrt: kafkaConf.TLS.ClientCrt,
ClientKey: kafkaConf.TLS.ClientKey,
ClientKeyPwd: kafkaConf.TLS.ClientKeyPwd,
InsecureSkipVerify: false,
}
}
@ -114,11 +114,11 @@ func NewOnlineHistoryRedisConsumerHandler(
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest,
IsReturnErr: false,
UserName: config.Kafka.Username,
Password: config.Kafka.Password,
}, []string{config.Kafka.LatestMsgToRedis.Topic},
config.Kafka.Addr,
config.Kafka.ConsumerGroupID.MsgToRedis,
UserName: kafkaConf.Username,
Password: kafkaConf.Password,
}, []string{kafkaConf.LatestMsgToRedis.Topic},
kafkaConf.Addr,
kafkaConf.ConsumerGroupID.MsgToRedis,
tlsConfig,
)
// statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d

@ -33,14 +33,14 @@ type OnlineHistoryMongoConsumerHandler struct {
msgDatabase controller.CommonMsgDatabase
}
func NewOnlineHistoryMongoConsumerHandler(config *config.GlobalConfig, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
var tlsConfig *kfk.TLSConfig
if config.Kafka.TLS != nil {
if kafkaConf.TLS != nil {
tlsConfig = &kfk.TLSConfig{
CACrt: config.Kafka.TLS.CACrt,
ClientCrt: config.Kafka.TLS.ClientCrt,
ClientKey: config.Kafka.TLS.ClientKey,
ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd,
CACrt: kafkaConf.TLS.CACrt,
ClientCrt: kafkaConf.TLS.ClientCrt,
ClientKey: kafkaConf.TLS.ClientKey,
ClientKeyPwd: kafkaConf.TLS.ClientKeyPwd,
InsecureSkipVerify: false,
}
}
@ -48,11 +48,11 @@ func NewOnlineHistoryMongoConsumerHandler(config *config.GlobalConfig, database
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest,
IsReturnErr: false,
UserName: config.Kafka.Username,
Password: config.Kafka.Password,
}, []string{config.Kafka.MsgToMongo.Topic},
config.Kafka.Addr,
config.Kafka.ConsumerGroupID.MsgToMongo,
UserName: kafkaConf.Username,
Password: kafkaConf.Password,
}, []string{kafkaConf.MsgToMongo.Topic},
kafkaConf.Addr,
kafkaConf.ConsumerGroupID.MsgToMongo,
tlsConfig,
)
if err != nil {

@ -29,12 +29,12 @@ import (
func callbackOfflinePush(
ctx context.Context,
config *config.GlobalConfig,
callback *config.Callback,
userIDs []string,
msg *sdkws.MsgData,
offlinePushUserIDs *[]string,
) error {
if !config.Callback.CallbackOfflinePush.Enable || msg.ContentType == constant.Typing {
if !callback.CallbackOfflinePush.Enable || msg.ContentType == constant.Typing {
return nil
}
req := &callbackstruct.CallbackBeforePushReq{
@ -58,7 +58,7 @@ func callbackOfflinePush(
}
resp := &callbackstruct.CallbackBeforePushResp{}
if err := http.CallBackPostReturn(ctx, config.Callback.CallbackUrl, req, resp, config.Callback.CallbackOfflinePush); err != nil {
if err := http.CallBackPostReturn(ctx, callback.CallbackUrl, req, resp, callback.CallbackOfflinePush); err != nil {
return err
}
@ -71,8 +71,8 @@ func callbackOfflinePush(
return nil
}
func callbackOnlinePush(ctx context.Context, config *config.GlobalConfig, userIDs []string, msg *sdkws.MsgData) error {
if !config.Callback.CallbackOnlinePush.Enable || utils.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing {
func callbackOnlinePush(ctx context.Context, callback *config.Callback, userIDs []string, msg *sdkws.MsgData) error {
if !callback.CallbackOnlinePush.Enable || utils.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing {
return nil
}
req := callbackstruct.CallbackBeforePushReq{
@ -94,7 +94,7 @@ func callbackOnlinePush(ctx context.Context, config *config.GlobalConfig, userID
Content: GetContent(msg),
}
resp := &callbackstruct.CallbackBeforePushResp{}
if err := http.CallBackPostReturn(ctx, config.Callback.CallbackUrl, req, resp, config.Callback.CallbackOnlinePush); err != nil {
if err := http.CallBackPostReturn(ctx, callback.CallbackUrl, req, resp, callback.CallbackOnlinePush); err != nil {
return err
}
return nil
@ -102,12 +102,12 @@ func callbackOnlinePush(ctx context.Context, config *config.GlobalConfig, userID
func callbackBeforeSuperGroupOnlinePush(
ctx context.Context,
config *config.GlobalConfig,
callback *config.Callback,
groupID string,
msg *sdkws.MsgData,
pushToUserIDs *[]string,
) error {
if !config.Callback.CallbackBeforeSuperGroupOnlinePush.Enable || msg.ContentType == constant.Typing {
if !callback.CallbackBeforeSuperGroupOnlinePush.Enable || msg.ContentType == constant.Typing {
return nil
}
req := callbackstruct.CallbackBeforeSuperGroupOnlinePushReq{
@ -127,7 +127,7 @@ func callbackBeforeSuperGroupOnlinePush(
Seq: msg.Seq,
}
resp := &callbackstruct.CallbackBeforeSuperGroupOnlinePushResp{}
if err := http.CallBackPostReturn(ctx, config.Callback.CallbackUrl, req, resp, config.Callback.CallbackBeforeSuperGroupOnlinePush); err != nil {
if err := http.CallBackPostReturn(ctx, callback.CallbackUrl, req, resp, callback.CallbackBeforeSuperGroupOnlinePush); err != nil {
return err
}

@ -26,8 +26,8 @@ type Consumer struct {
// successCount uint64
}
func NewConsumer(config *config.GlobalConfig, pusher *Pusher) (*Consumer, error) {
c, err := NewConsumerHandler(config, pusher)
func NewConsumer(kafkaConf *config.Kafka, pusher *Pusher) (*Consumer, error) {
c, err := NewConsumerHandler(kafkaConf, pusher)
if err != nil {
return nil, err
}

@ -40,9 +40,9 @@ type Fcm struct {
// NewClient initializes a new FCM client using the Firebase Admin SDK.
// It requires the FCM service account credentials file located within the project's configuration directory.
func NewClient(globalConfig *config.GlobalConfig, cache cache.MsgModel) *Fcm {
func NewClient(pushConf *config.Push, cache cache.MsgModel) *Fcm {
projectRoot := config.GetProjectRoot()
credentialsFilePath := filepath.Join(projectRoot, "config", globalConfig.Push.Fcm.ServiceAccount)
credentialsFilePath := filepath.Join(projectRoot, "config", pushConf.Fcm.ServiceAccount)
opt := option.WithCredentialsFile(credentialsFilePath)
fcmApp, err := firebase.NewApp(context.Background(), nil, opt)
if err != nil {

@ -133,13 +133,13 @@ type Payload struct {
IsSignal bool `json:"isSignal"`
}
func newPushReq(config *config.GlobalConfig, title, content string) PushReq {
func newPushReq(pushConf *config.Push, title, content string) PushReq {
pushReq := PushReq{PushMessage: &PushMessage{Notification: &Notification{
Title: title,
Body: content,
ClickType: "startapp",
ChannelID: config.Push.GeTui.ChannelID,
ChannelName: config.Push.GeTui.ChannelName,
ChannelID: pushConf.GeTui.ChannelID,
ChannelName: pushConf.GeTui.ChannelName,
}}}
return pushReq
}

@ -56,14 +56,14 @@ type Client struct {
cache cache.MsgModel
tokenExpireTime int64
taskIDTTL int64
config *config.GlobalConfig
pushConf *config.Push
}
func NewClient(config *config.GlobalConfig, cache cache.MsgModel) *Client {
func NewClient(pushConf *config.Push, cache cache.MsgModel) *Client {
return &Client{cache: cache,
tokenExpireTime: tokenExpireTime,
taskIDTTL: taskIDTTL,
config: config,
pushConf: pushConf,
}
}
@ -80,7 +80,7 @@ func (g *Client) Push(ctx context.Context, userIDs []string, title, content stri
return err
}
}
pushReq := newPushReq(g.config, title, content)
pushReq := newPushReq(g.pushConf, title, content)
pushReq.setPushChannel(title, content)
if len(userIDs) > 1 {
maxNum := 999
@ -116,13 +116,13 @@ func (g *Client) Push(ctx context.Context, userIDs []string, title, content stri
func (g *Client) Auth(ctx context.Context, timeStamp int64) (token string, expireTime int64, err error) {
h := sha256.New()
h.Write(
[]byte(g.config.Push.GeTui.AppKey + strconv.Itoa(int(timeStamp)) + g.config.Push.GeTui.MasterSecret),
[]byte(g.pushConf.GeTui.AppKey + strconv.Itoa(int(timeStamp)) + g.pushConf.GeTui.MasterSecret),
)
sign := hex.EncodeToString(h.Sum(nil))
reqAuth := AuthReq{
Sign: sign,
Timestamp: strconv.Itoa(int(timeStamp)),
AppKey: g.config.Push.GeTui.AppKey,
AppKey: g.pushConf.GeTui.AppKey,
}
respAuth := AuthResp{}
err = g.request(ctx, authURL, reqAuth, "", &respAuth)
@ -165,7 +165,7 @@ func (g *Client) request(ctx context.Context, url string, input any, token strin
header := map[string]string{"token": token}
resp := &Resp{}
resp.Data = output
return g.postReturn(ctx, g.config.Push.GeTui.PushUrl+url, header, input, resp, 3)
return g.postReturn(ctx, g.pushConf.GeTui.PushUrl+url, header, input, resp, 3)
}
func (g *Client) postReturn(

@ -56,8 +56,8 @@ func (n *Notification) SetExtras(extras Extras) {
n.Android.Extras = extras
}
func (n *Notification) SetAndroidIntent(config *config.GlobalConfig) {
n.Android.Intent.URL = config.Push.Jpns.PushIntent
func (n *Notification) SetAndroidIntent(pushConf *config.Push) {
n.Android.Intent.URL = pushConf.Jpns.PushIntent
}
func (n *Notification) IOSEnableMutableContent() {

@ -26,11 +26,12 @@ import (
)
type JPush struct {
config *config.GlobalConfig
pushConf *config.Push
iOSPushConf *config.IOSPush
}
func NewClient(config *config.GlobalConfig) *JPush {
return &JPush{config: config}
func NewClient(pushConf *config.Push, iOSPushConf *config.IOSPush) *JPush {
return &JPush{pushConf: pushConf, iOSPushConf: iOSPushConf}
}
func (j *JPush) Auth(apiKey, secretKey string, timeStamp int64) (token string, err error) {
@ -61,12 +62,12 @@ func (j *JPush) Push(ctx context.Context, userIDs []string, title, content strin
no.IOSEnableMutableContent()
no.SetExtras(extras)
no.SetAlert(title)
no.SetAndroidIntent(j.config)
no.SetAndroidIntent(j.pushConf)
var msg body.Message
msg.SetMsgContent(content)
var opt body.Options
opt.SetApnsProduction(j.config.IOSPush.Production)
opt.SetApnsProduction(j.iOSPushConf.Production)
var pushObj body.PushObj
pushObj.SetPlatform(&pf)
pushObj.SetAudience(&au)
@ -80,9 +81,9 @@ func (j *JPush) Push(ctx context.Context, userIDs []string, title, content strin
func (j *JPush) request(ctx context.Context, po body.PushObj, resp any, timeout int) error {
return http2.PostReturn(
ctx,
j.config.Push.Jpns.PushUrl,
j.pushConf.Jpns.PushUrl,
map[string]string{
"Authorization": j.getAuthorization(j.config.Push.Jpns.AppKey, j.config.Push.Jpns.MasterSecret),
"Authorization": j.getAuthorization(j.pushConf.Jpns.AppKey, j.pushConf.Jpns.MasterSecret),
},
po,
resp,

@ -34,17 +34,17 @@ type ConsumerHandler struct {
pusher *Pusher
}
func NewConsumerHandler(config *config.GlobalConfig, pusher *Pusher) (*ConsumerHandler, error) {
func NewConsumerHandler(kafkaConf *config.Kafka, pusher *Pusher) (*ConsumerHandler, error) {
var consumerHandler ConsumerHandler
consumerHandler.pusher = pusher
var err error
var tlsConfig *kfk.TLSConfig
if config.Kafka.TLS != nil {
if kafkaConf.TLS != nil {
tlsConfig = &kfk.TLSConfig{
CACrt: config.Kafka.TLS.CACrt,
ClientCrt: config.Kafka.TLS.ClientCrt,
ClientKey: config.Kafka.TLS.ClientKey,
ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd,
CACrt: kafkaConf.TLS.CACrt,
ClientCrt: kafkaConf.TLS.ClientCrt,
ClientKey: kafkaConf.TLS.ClientKey,
ClientKeyPwd: kafkaConf.TLS.ClientKeyPwd,
InsecureSkipVerify: false,
}
}
@ -52,10 +52,10 @@ func NewConsumerHandler(config *config.GlobalConfig, pusher *Pusher) (*ConsumerH
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest,
IsReturnErr: false,
UserName: config.Kafka.Username,
Password: config.Kafka.Password,
}, []string{config.Kafka.MsgToPush.Topic}, config.Kafka.Addr,
config.Kafka.ConsumerGroupID.MsgToPush,
UserName: kafkaConf.Username,
Password: kafkaConf.Password,
}, []string{kafkaConf.MsgToPush.Topic}, kafkaConf.Addr,
kafkaConf.ConsumerGroupID.MsgToPush,
tlsConfig)
if err != nil {
return nil, err

@ -33,20 +33,19 @@ import (
type pushServer struct {
pusher *Pusher
config *config.GlobalConfig
}
func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(config)
rdb, err := cache.NewRedis(&config.Redis)
if err != nil {
return err
}
cacheModel := cache.NewMsgCacheModel(rdb, config)
offlinePusher := NewOfflinePusher(config, cacheModel)
cacheModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis)
offlinePusher := NewOfflinePusher(&config.Push, &config.IOSPush, cacheModel)
database := controller.NewPushDatabase(cacheModel)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config)
conversationRpcClient := rpcclient.NewConversationRpcClient(client, config)
msgRpcClient := rpcclient.NewMessageRpcClient(client, config)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName)
conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.RpcRegisterName.OpenImConversationName)
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.RpcRegisterName.OpenImMsgName)
pusher := NewPusher(
config,
client,
@ -61,10 +60,9 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg
pbpush.RegisterPushMsgServiceServer(server, &pushServer{
pusher: pusher,
config: config,
})
consumer, err := NewConsumer(config, pusher)
consumer, err := NewConsumer(&config.Kafka, pusher)
if err != nil {
return err
}

@ -76,15 +76,15 @@ func NewPusher(config *config.GlobalConfig, discov discoveryregistry.SvcDiscover
}
}
func NewOfflinePusher(config *config.GlobalConfig, cache cache.MsgModel) offlinepush.OfflinePusher {
func NewOfflinePusher(pushConf *config.Push, iOSPushConf *config.IOSPush, cache cache.MsgModel) offlinepush.OfflinePusher {
var offlinePusher offlinepush.OfflinePusher
switch config.Push.Enable {
switch pushConf.Enable {
case "getui":
offlinePusher = getui.NewClient(config, cache)
offlinePusher = getui.NewClient(pushConf, cache)
case "fcm":
offlinePusher = fcm.NewClient(config, cache)
offlinePusher = fcm.NewClient(pushConf, cache)
case "jpush":
offlinePusher = jpush.NewClient(config)
offlinePusher = jpush.NewClient(pushConf, iOSPushConf)
default:
offlinePusher = dummy.NewClient()
}
@ -102,7 +102,7 @@ func (p *Pusher) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID
func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error {
log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String())
if err := callbackOnlinePush(ctx, p.config, userIDs, msg); err != nil {
if err := callbackOnlinePush(ctx, &p.config.Callback, userIDs, msg); err != nil {
return err
}
// push
@ -130,7 +130,7 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg
})
if len(offlinePushUserIDList) > 0 {
if err = callbackOfflinePush(ctx, p.config, offlinePushUserIDList, msg, &[]string{}); err != nil {
if err = callbackOfflinePush(ctx, &p.config.Callback, offlinePushUserIDList, msg, &[]string{}); err != nil {
return err
}
err = p.offlinePushMsg(ctx, msg.SendID, msg, offlinePushUserIDList)
@ -163,7 +163,7 @@ func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string,
}
if len(needOfflinePushUserIDs) > 0 {
var offlinePushUserIDs []string
err := callbackOfflinePush(ctx, p.config, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
err := callbackOfflinePush(ctx, &p.config.Callback, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
if err != nil {
return err
}
@ -194,7 +194,7 @@ func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string,
func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
var pushToUserIDs []string
if err = callbackBeforeSuperGroupOnlinePush(ctx, p.config, groupID, msg, &pushToUserIDs); err != nil {
if err = callbackBeforeSuperGroupOnlinePush(ctx, &p.config.Callback, groupID, msg, &pushToUserIDs); err != nil {
return err
}
@ -299,7 +299,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
// Use offline push messaging
if len(needOfflinePushUserIDs) > 0 {
var offlinePushUserIDs []string
err = callbackOfflinePush(ctx, p.config, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
err = callbackOfflinePush(ctx, &p.config.Callback, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
if err != nil {
return err
}

@ -121,33 +121,33 @@ type CommonMsgDatabase interface {
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
}
func NewCommonMsgDatabase(msgDocModel unrelationtb.MsgDocModelInterface, cacheModel cache.MsgModel, config *config.GlobalConfig) (CommonMsgDatabase, error) {
func NewCommonMsgDatabase(msgDocModel unrelationtb.MsgDocModelInterface, cacheModel cache.MsgModel, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
producerConfig := &kafka.ProducerConfig{
ProducerAck: config.Kafka.ProducerAck,
CompressType: config.Kafka.CompressType,
Username: config.Kafka.Username,
Password: config.Kafka.Password,
ProducerAck: kafkaConf.ProducerAck,
CompressType: kafkaConf.CompressType,
Username: kafkaConf.Username,
Password: kafkaConf.Password,
}
var tlsConfig *kafka.TLSConfig
if config.Kafka.TLS != nil {
if kafkaConf.TLS != nil {
tlsConfig = &kafka.TLSConfig{
CACrt: config.Kafka.TLS.CACrt,
ClientCrt: config.Kafka.TLS.ClientCrt,
ClientKey: config.Kafka.TLS.ClientKey,
ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd,
CACrt: kafkaConf.TLS.CACrt,
ClientCrt: kafkaConf.TLS.ClientCrt,
ClientKey: kafkaConf.TLS.ClientKey,
ClientKeyPwd: kafkaConf.TLS.ClientKeyPwd,
InsecureSkipVerify: false,
}
}
producerToRedis, err := kafka.NewKafkaProducer(config.Kafka.Addr, config.Kafka.LatestMsgToRedis.Topic, producerConfig, tlsConfig)
producerToRedis, err := kafka.NewKafkaProducer(kafkaConf.Addr, kafkaConf.LatestMsgToRedis.Topic, producerConfig, tlsConfig)
if err != nil {
return nil, err
}
producerToMongo, err := kafka.NewKafkaProducer(config.Kafka.Addr, config.Kafka.MsgToMongo.Topic, producerConfig, tlsConfig)
producerToMongo, err := kafka.NewKafkaProducer(kafkaConf.Addr, kafkaConf.MsgToMongo.Topic, producerConfig, tlsConfig)
if err != nil {
return nil, err
}
producerToPush, err := kafka.NewKafkaProducer(config.Kafka.Addr, config.Kafka.MsgToPush.Topic, producerConfig, tlsConfig)
producerToPush, err := kafka.NewKafkaProducer(kafkaConf.Addr, kafkaConf.MsgToPush.Topic, producerConfig, tlsConfig)
if err != nil {
return nil, err
}

@ -37,14 +37,14 @@ const (
)
type Mongo struct {
db *mongo.Client
config *config.GlobalConfig
db *mongo.Client
mongoConf *config.Mongo
}
// NewMongo Initialize MongoDB connection.
func NewMongo(config *config.GlobalConfig) (*Mongo, error) {
func NewMongo(mongoConf *config.Mongo) (*Mongo, error) {
specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound)
uri := buildMongoURI(config)
uri := buildMongoURI(mongoConf)
var mongoClient *mongo.Client
var err error
@ -58,7 +58,7 @@ func NewMongo(config *config.GlobalConfig) (*Mongo, error) {
if err = mongoClient.Ping(ctx, nil); err != nil {
return nil, errs.Wrap(err, uri)
}
return &Mongo{db: mongoClient, config: config}, nil
return &Mongo{db: mongoClient, mongoConf: mongoConf}, nil
}
if shouldRetry(err) {
time.Sleep(time.Second) // exponential backoff could be implemented here
@ -68,14 +68,14 @@ func NewMongo(config *config.GlobalConfig) (*Mongo, error) {
return nil, errs.Wrap(err, uri)
}
func buildMongoURI(config *config.GlobalConfig) string {
func buildMongoURI(mongoConf *config.Mongo) string {
uri := os.Getenv("MONGO_URI")
if uri != "" {
return uri
}
if config.Mongo.Uri != "" {
return config.Mongo.Uri
if mongoConf.Uri != "" {
return mongoConf.Uri
}
username := os.Getenv("MONGO_OPENIM_USERNAME")
@ -86,21 +86,21 @@ func buildMongoURI(config *config.GlobalConfig) string {
maxPoolSize := os.Getenv("MONGO_MAX_POOL_SIZE")
if username == "" {
username = config.Mongo.Username
username = mongoConf.Username
}
if password == "" {
password = config.Mongo.Password
password = mongoConf.Password
}
if address == "" {
address = strings.Join(config.Mongo.Address, ",")
address = strings.Join(mongoConf.Address, ",")
} else if port != "" {
address = fmt.Sprintf("%s:%s", address, port)
}
if database == "" {
database = config.Mongo.Database
database = mongoConf.Database
}
if maxPoolSize == "" {
maxPoolSize = fmt.Sprint(config.Mongo.MaxPoolSize)
maxPoolSize = fmt.Sprint(mongoConf.MaxPoolSize)
}
if username != "" && password != "" {
@ -134,7 +134,7 @@ func (m *Mongo) CreateMsgIndex() error {
// createMongoIndex creates an index in a MongoDB collection.
func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...string) error {
db := m.GetDatabase(m.config.Mongo.Database).Collection(collection)
db := m.GetDatabase(m.mongoConf.Database).Collection(collection)
opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
indexView := db.Indexes()

@ -32,17 +32,17 @@ func NewGrpcPromObj(cusMetrics []prometheus.Collector) (*prometheus.Registry, *g
return reg, grpcMetrics, nil
}
func GetGrpcCusMetrics(registerName string, config *config2.GlobalConfig) []prometheus.Collector {
func GetGrpcCusMetrics(registerName string, rpcRegisterName *config2.RpcRegisterName) []prometheus.Collector {
switch registerName {
case config.RpcRegisterName.OpenImMessageGatewayName:
case rpcRegisterName.OpenImMessageGatewayName:
return []prometheus.Collector{OnlineUserGauge}
case config.RpcRegisterName.OpenImMsgName:
case rpcRegisterName.OpenImMsgName:
return []prometheus.Collector{SingleChatMsgProcessSuccessCounter, SingleChatMsgProcessFailedCounter, GroupChatMsgProcessSuccessCounter, GroupChatMsgProcessFailedCounter}
case "Transfer":
return []prometheus.Collector{MsgInsertRedisSuccessCounter, MsgInsertRedisFailedCounter, MsgInsertMongoSuccessCounter, MsgInsertMongoFailedCounter, SeqSetFailedCounter}
case config.RpcRegisterName.OpenImPushName:
case rpcRegisterName.OpenImPushName:
return []prometheus.Collector{MsgOfflinePushFailedCounter}
case config.RpcRegisterName.OpenImAuthName:
case rpcRegisterName.OpenImAuthName:
return []prometheus.Collector{UserLoginCounter}
default:
return nil

Loading…
Cancel
Save