From 415793bcbfe3815b4934e53e6d02104e0ba4b466 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Thu, 14 Mar 2024 10:56:18 +0800 Subject: [PATCH] refactor: extract nested structures in the config. --- internal/msgtransfer/init.go | 22 +++++++-------- .../msgtransfer/online_history_msg_handler.go | 22 +++++++-------- .../online_msg_to_mongo_handler.go | 22 +++++++-------- internal/push/callback.go | 18 ++++++------ internal/push/consumer_init.go | 4 +-- internal/push/offlinepush/fcm/push.go | 4 +-- internal/push/offlinepush/getui/body.go | 6 ++-- internal/push/offlinepush/getui/push.go | 14 +++++----- .../offlinepush/jpush/body/notification.go | 4 +-- internal/push/offlinepush/jpush/push.go | 15 +++++----- internal/push/push_handler.go | 20 ++++++------- internal/push/push_rpc_server.go | 16 +++++------ internal/push/push_to_client.go | 20 ++++++------- pkg/common/db/controller/msg.go | 26 ++++++++--------- pkg/common/db/unrelation/mongo.go | 28 +++++++++---------- pkg/common/prommetrics/prommetrics.go | 10 +++---- 16 files changed, 124 insertions(+), 127 deletions(-) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 188433c30..6f3d2fd3a 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -51,7 +51,6 @@ type MsgTransfer struct { // and handle the deletion notification message deleted subscriptions topic: msg_to_mongo ctx context.Context cancel context.CancelFunc - config *config.GlobalConfig } func StartTransfer(config *config.GlobalConfig, prometheusPort int) error { @@ -60,7 +59,7 @@ func StartTransfer(config *config.GlobalConfig, prometheusPort int) error { return err } - mongo, err := unrelation.NewMongo(config) + mongo, err := unrelation.NewMongo(&config.Mongo) if err != nil { return err } @@ -78,15 +77,15 @@ func StartTransfer(config *config.GlobalConfig, prometheusPort int) error { } client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) - msgModel := cache.NewMsgCacheModel(rdb, config) + msgModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis) msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase(config.Mongo.Database)) - msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, config) + msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, &config.Kafka) if err != nil { return err } - conversationRpcClient := rpcclient.NewConversationRpcClient(client, config) - groupRpcClient := rpcclient.NewGroupRpcClient(client, config) - msgTransfer, err := NewMsgTransfer(config, msgDatabase, &conversationRpcClient, &groupRpcClient) + conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.RpcRegisterName.OpenImConversationName) + groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName) + msgTransfer, err := NewMsgTransfer(&config.Kafka, msgDatabase, &conversationRpcClient, &groupRpcClient) if err != nil { return err } @@ -94,16 +93,16 @@ func StartTransfer(config *config.GlobalConfig, prometheusPort int) error { } func NewMsgTransfer( - config *config.GlobalConfig, + kafkaConf *config.Kafka, msgDatabase controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, ) (*MsgTransfer, error) { - historyCH, err := NewOnlineHistoryRedisConsumerHandler(config, msgDatabase, conversationRpcClient, groupRpcClient) + historyCH, err := NewOnlineHistoryRedisConsumerHandler(kafkaConf, msgDatabase, conversationRpcClient, groupRpcClient) if err != nil { return nil, err } - historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(config, msgDatabase) + historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(kafkaConf, msgDatabase) if err != nil { return nil, err } @@ -111,7 +110,6 @@ func NewMsgTransfer( return &MsgTransfer{ historyCH: historyCH, historyMongoCH: historyMongoCH, - config: config, }, nil } @@ -136,7 +134,7 @@ func (m *MsgTransfer) Start(prometheusPort int, config *config.GlobalConfig) err proreg.MustRegister( collectors.NewGoCollector(), ) - proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer", config)...) + proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer", &config.RpcRegisterName)...) http.Handle("/metrics", promhttp.HandlerFor(proreg, promhttp.HandlerOpts{Registry: proreg})) err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil) if err != nil && err != http.ErrServerClosed { diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 599fdaeee..a0d68a78a 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -82,7 +82,7 @@ type OnlineHistoryRedisConsumerHandler struct { } func NewOnlineHistoryRedisConsumerHandler( - config *config.GlobalConfig, + kafkaConf *config.Kafka, database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, @@ -100,12 +100,12 @@ func NewOnlineHistoryRedisConsumerHandler( var err error var tlsConfig *kafka.TLSConfig - if config.Kafka.TLS != nil { + if kafkaConf.TLS != nil { tlsConfig = &kafka.TLSConfig{ - CACrt: config.Kafka.TLS.CACrt, - ClientCrt: config.Kafka.TLS.ClientCrt, - ClientKey: config.Kafka.TLS.ClientKey, - ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd, + CACrt: kafkaConf.TLS.CACrt, + ClientCrt: kafkaConf.TLS.ClientCrt, + ClientKey: kafkaConf.TLS.ClientKey, + ClientKeyPwd: kafkaConf.TLS.ClientKeyPwd, InsecureSkipVerify: false, } } @@ -114,11 +114,11 @@ func NewOnlineHistoryRedisConsumerHandler( KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, - UserName: config.Kafka.Username, - Password: config.Kafka.Password, - }, []string{config.Kafka.LatestMsgToRedis.Topic}, - config.Kafka.Addr, - config.Kafka.ConsumerGroupID.MsgToRedis, + UserName: kafkaConf.Username, + Password: kafkaConf.Password, + }, []string{kafkaConf.LatestMsgToRedis.Topic}, + kafkaConf.Addr, + kafkaConf.ConsumerGroupID.MsgToRedis, tlsConfig, ) // statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index f420d74b7..471c9727c 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -33,14 +33,14 @@ type OnlineHistoryMongoConsumerHandler struct { msgDatabase controller.CommonMsgDatabase } -func NewOnlineHistoryMongoConsumerHandler(config *config.GlobalConfig, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) { +func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) { var tlsConfig *kfk.TLSConfig - if config.Kafka.TLS != nil { + if kafkaConf.TLS != nil { tlsConfig = &kfk.TLSConfig{ - CACrt: config.Kafka.TLS.CACrt, - ClientCrt: config.Kafka.TLS.ClientCrt, - ClientKey: config.Kafka.TLS.ClientKey, - ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd, + CACrt: kafkaConf.TLS.CACrt, + ClientCrt: kafkaConf.TLS.ClientCrt, + ClientKey: kafkaConf.TLS.ClientKey, + ClientKeyPwd: kafkaConf.TLS.ClientKeyPwd, InsecureSkipVerify: false, } } @@ -48,11 +48,11 @@ func NewOnlineHistoryMongoConsumerHandler(config *config.GlobalConfig, database KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, - UserName: config.Kafka.Username, - Password: config.Kafka.Password, - }, []string{config.Kafka.MsgToMongo.Topic}, - config.Kafka.Addr, - config.Kafka.ConsumerGroupID.MsgToMongo, + UserName: kafkaConf.Username, + Password: kafkaConf.Password, + }, []string{kafkaConf.MsgToMongo.Topic}, + kafkaConf.Addr, + kafkaConf.ConsumerGroupID.MsgToMongo, tlsConfig, ) if err != nil { diff --git a/internal/push/callback.go b/internal/push/callback.go index 5fd906d44..c33aec6a1 100644 --- a/internal/push/callback.go +++ b/internal/push/callback.go @@ -29,12 +29,12 @@ import ( func callbackOfflinePush( ctx context.Context, - config *config.GlobalConfig, + callback *config.Callback, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string, ) error { - if !config.Callback.CallbackOfflinePush.Enable || msg.ContentType == constant.Typing { + if !callback.CallbackOfflinePush.Enable || msg.ContentType == constant.Typing { return nil } req := &callbackstruct.CallbackBeforePushReq{ @@ -58,7 +58,7 @@ func callbackOfflinePush( } resp := &callbackstruct.CallbackBeforePushResp{} - if err := http.CallBackPostReturn(ctx, config.Callback.CallbackUrl, req, resp, config.Callback.CallbackOfflinePush); err != nil { + if err := http.CallBackPostReturn(ctx, callback.CallbackUrl, req, resp, callback.CallbackOfflinePush); err != nil { return err } @@ -71,8 +71,8 @@ func callbackOfflinePush( return nil } -func callbackOnlinePush(ctx context.Context, config *config.GlobalConfig, userIDs []string, msg *sdkws.MsgData) error { - if !config.Callback.CallbackOnlinePush.Enable || utils.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing { +func callbackOnlinePush(ctx context.Context, callback *config.Callback, userIDs []string, msg *sdkws.MsgData) error { + if !callback.CallbackOnlinePush.Enable || utils.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing { return nil } req := callbackstruct.CallbackBeforePushReq{ @@ -94,7 +94,7 @@ func callbackOnlinePush(ctx context.Context, config *config.GlobalConfig, userID Content: GetContent(msg), } resp := &callbackstruct.CallbackBeforePushResp{} - if err := http.CallBackPostReturn(ctx, config.Callback.CallbackUrl, req, resp, config.Callback.CallbackOnlinePush); err != nil { + if err := http.CallBackPostReturn(ctx, callback.CallbackUrl, req, resp, callback.CallbackOnlinePush); err != nil { return err } return nil @@ -102,12 +102,12 @@ func callbackOnlinePush(ctx context.Context, config *config.GlobalConfig, userID func callbackBeforeSuperGroupOnlinePush( ctx context.Context, - config *config.GlobalConfig, + callback *config.Callback, groupID string, msg *sdkws.MsgData, pushToUserIDs *[]string, ) error { - if !config.Callback.CallbackBeforeSuperGroupOnlinePush.Enable || msg.ContentType == constant.Typing { + if !callback.CallbackBeforeSuperGroupOnlinePush.Enable || msg.ContentType == constant.Typing { return nil } req := callbackstruct.CallbackBeforeSuperGroupOnlinePushReq{ @@ -127,7 +127,7 @@ func callbackBeforeSuperGroupOnlinePush( Seq: msg.Seq, } resp := &callbackstruct.CallbackBeforeSuperGroupOnlinePushResp{} - if err := http.CallBackPostReturn(ctx, config.Callback.CallbackUrl, req, resp, config.Callback.CallbackBeforeSuperGroupOnlinePush); err != nil { + if err := http.CallBackPostReturn(ctx, callback.CallbackUrl, req, resp, callback.CallbackBeforeSuperGroupOnlinePush); err != nil { return err } diff --git a/internal/push/consumer_init.go b/internal/push/consumer_init.go index 351b63f46..3b401735e 100644 --- a/internal/push/consumer_init.go +++ b/internal/push/consumer_init.go @@ -26,8 +26,8 @@ type Consumer struct { // successCount uint64 } -func NewConsumer(config *config.GlobalConfig, pusher *Pusher) (*Consumer, error) { - c, err := NewConsumerHandler(config, pusher) +func NewConsumer(kafkaConf *config.Kafka, pusher *Pusher) (*Consumer, error) { + c, err := NewConsumerHandler(kafkaConf, pusher) if err != nil { return nil, err } diff --git a/internal/push/offlinepush/fcm/push.go b/internal/push/offlinepush/fcm/push.go index f34d9a6a4..81d727f46 100644 --- a/internal/push/offlinepush/fcm/push.go +++ b/internal/push/offlinepush/fcm/push.go @@ -40,9 +40,9 @@ type Fcm struct { // NewClient initializes a new FCM client using the Firebase Admin SDK. // It requires the FCM service account credentials file located within the project's configuration directory. -func NewClient(globalConfig *config.GlobalConfig, cache cache.MsgModel) *Fcm { +func NewClient(pushConf *config.Push, cache cache.MsgModel) *Fcm { projectRoot := config.GetProjectRoot() - credentialsFilePath := filepath.Join(projectRoot, "config", globalConfig.Push.Fcm.ServiceAccount) + credentialsFilePath := filepath.Join(projectRoot, "config", pushConf.Fcm.ServiceAccount) opt := option.WithCredentialsFile(credentialsFilePath) fcmApp, err := firebase.NewApp(context.Background(), nil, opt) if err != nil { diff --git a/internal/push/offlinepush/getui/body.go b/internal/push/offlinepush/getui/body.go index 46479163f..a96ff4efc 100644 --- a/internal/push/offlinepush/getui/body.go +++ b/internal/push/offlinepush/getui/body.go @@ -133,13 +133,13 @@ type Payload struct { IsSignal bool `json:"isSignal"` } -func newPushReq(config *config.GlobalConfig, title, content string) PushReq { +func newPushReq(pushConf *config.Push, title, content string) PushReq { pushReq := PushReq{PushMessage: &PushMessage{Notification: &Notification{ Title: title, Body: content, ClickType: "startapp", - ChannelID: config.Push.GeTui.ChannelID, - ChannelName: config.Push.GeTui.ChannelName, + ChannelID: pushConf.GeTui.ChannelID, + ChannelName: pushConf.GeTui.ChannelName, }}} return pushReq } diff --git a/internal/push/offlinepush/getui/push.go b/internal/push/offlinepush/getui/push.go index 1934ab61e..1032ff4ea 100644 --- a/internal/push/offlinepush/getui/push.go +++ b/internal/push/offlinepush/getui/push.go @@ -56,14 +56,14 @@ type Client struct { cache cache.MsgModel tokenExpireTime int64 taskIDTTL int64 - config *config.GlobalConfig + pushConf *config.Push } -func NewClient(config *config.GlobalConfig, cache cache.MsgModel) *Client { +func NewClient(pushConf *config.Push, cache cache.MsgModel) *Client { return &Client{cache: cache, tokenExpireTime: tokenExpireTime, taskIDTTL: taskIDTTL, - config: config, + pushConf: pushConf, } } @@ -80,7 +80,7 @@ func (g *Client) Push(ctx context.Context, userIDs []string, title, content stri return err } } - pushReq := newPushReq(g.config, title, content) + pushReq := newPushReq(g.pushConf, title, content) pushReq.setPushChannel(title, content) if len(userIDs) > 1 { maxNum := 999 @@ -116,13 +116,13 @@ func (g *Client) Push(ctx context.Context, userIDs []string, title, content stri func (g *Client) Auth(ctx context.Context, timeStamp int64) (token string, expireTime int64, err error) { h := sha256.New() h.Write( - []byte(g.config.Push.GeTui.AppKey + strconv.Itoa(int(timeStamp)) + g.config.Push.GeTui.MasterSecret), + []byte(g.pushConf.GeTui.AppKey + strconv.Itoa(int(timeStamp)) + g.pushConf.GeTui.MasterSecret), ) sign := hex.EncodeToString(h.Sum(nil)) reqAuth := AuthReq{ Sign: sign, Timestamp: strconv.Itoa(int(timeStamp)), - AppKey: g.config.Push.GeTui.AppKey, + AppKey: g.pushConf.GeTui.AppKey, } respAuth := AuthResp{} err = g.request(ctx, authURL, reqAuth, "", &respAuth) @@ -165,7 +165,7 @@ func (g *Client) request(ctx context.Context, url string, input any, token strin header := map[string]string{"token": token} resp := &Resp{} resp.Data = output - return g.postReturn(ctx, g.config.Push.GeTui.PushUrl+url, header, input, resp, 3) + return g.postReturn(ctx, g.pushConf.GeTui.PushUrl+url, header, input, resp, 3) } func (g *Client) postReturn( diff --git a/internal/push/offlinepush/jpush/body/notification.go b/internal/push/offlinepush/jpush/body/notification.go index b25882ea5..0f68e9a68 100644 --- a/internal/push/offlinepush/jpush/body/notification.go +++ b/internal/push/offlinepush/jpush/body/notification.go @@ -56,8 +56,8 @@ func (n *Notification) SetExtras(extras Extras) { n.Android.Extras = extras } -func (n *Notification) SetAndroidIntent(config *config.GlobalConfig) { - n.Android.Intent.URL = config.Push.Jpns.PushIntent +func (n *Notification) SetAndroidIntent(pushConf *config.Push) { + n.Android.Intent.URL = pushConf.Jpns.PushIntent } func (n *Notification) IOSEnableMutableContent() { diff --git a/internal/push/offlinepush/jpush/push.go b/internal/push/offlinepush/jpush/push.go index 2ced4bfd3..b9b0d923c 100644 --- a/internal/push/offlinepush/jpush/push.go +++ b/internal/push/offlinepush/jpush/push.go @@ -26,11 +26,12 @@ import ( ) type JPush struct { - config *config.GlobalConfig + pushConf *config.Push + iOSPushConf *config.IOSPush } -func NewClient(config *config.GlobalConfig) *JPush { - return &JPush{config: config} +func NewClient(pushConf *config.Push, iOSPushConf *config.IOSPush) *JPush { + return &JPush{pushConf: pushConf, iOSPushConf: iOSPushConf} } func (j *JPush) Auth(apiKey, secretKey string, timeStamp int64) (token string, err error) { @@ -61,12 +62,12 @@ func (j *JPush) Push(ctx context.Context, userIDs []string, title, content strin no.IOSEnableMutableContent() no.SetExtras(extras) no.SetAlert(title) - no.SetAndroidIntent(j.config) + no.SetAndroidIntent(j.pushConf) var msg body.Message msg.SetMsgContent(content) var opt body.Options - opt.SetApnsProduction(j.config.IOSPush.Production) + opt.SetApnsProduction(j.iOSPushConf.Production) var pushObj body.PushObj pushObj.SetPlatform(&pf) pushObj.SetAudience(&au) @@ -80,9 +81,9 @@ func (j *JPush) Push(ctx context.Context, userIDs []string, title, content strin func (j *JPush) request(ctx context.Context, po body.PushObj, resp any, timeout int) error { return http2.PostReturn( ctx, - j.config.Push.Jpns.PushUrl, + j.pushConf.Jpns.PushUrl, map[string]string{ - "Authorization": j.getAuthorization(j.config.Push.Jpns.AppKey, j.config.Push.Jpns.MasterSecret), + "Authorization": j.getAuthorization(j.pushConf.Jpns.AppKey, j.pushConf.Jpns.MasterSecret), }, po, resp, diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 3d1392611..b5a5c545a 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -34,17 +34,17 @@ type ConsumerHandler struct { pusher *Pusher } -func NewConsumerHandler(config *config.GlobalConfig, pusher *Pusher) (*ConsumerHandler, error) { +func NewConsumerHandler(kafkaConf *config.Kafka, pusher *Pusher) (*ConsumerHandler, error) { var consumerHandler ConsumerHandler consumerHandler.pusher = pusher var err error var tlsConfig *kfk.TLSConfig - if config.Kafka.TLS != nil { + if kafkaConf.TLS != nil { tlsConfig = &kfk.TLSConfig{ - CACrt: config.Kafka.TLS.CACrt, - ClientCrt: config.Kafka.TLS.ClientCrt, - ClientKey: config.Kafka.TLS.ClientKey, - ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd, + CACrt: kafkaConf.TLS.CACrt, + ClientCrt: kafkaConf.TLS.ClientCrt, + ClientKey: kafkaConf.TLS.ClientKey, + ClientKeyPwd: kafkaConf.TLS.ClientKeyPwd, InsecureSkipVerify: false, } } @@ -52,10 +52,10 @@ func NewConsumerHandler(config *config.GlobalConfig, pusher *Pusher) (*ConsumerH KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, - UserName: config.Kafka.Username, - Password: config.Kafka.Password, - }, []string{config.Kafka.MsgToPush.Topic}, config.Kafka.Addr, - config.Kafka.ConsumerGroupID.MsgToPush, + UserName: kafkaConf.Username, + Password: kafkaConf.Password, + }, []string{kafkaConf.MsgToPush.Topic}, kafkaConf.Addr, + kafkaConf.ConsumerGroupID.MsgToPush, tlsConfig) if err != nil { return nil, err diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index fca4ffc25..5498ef8ec 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -33,20 +33,19 @@ import ( type pushServer struct { pusher *Pusher - config *config.GlobalConfig } func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis(config) + rdb, err := cache.NewRedis(&config.Redis) if err != nil { return err } - cacheModel := cache.NewMsgCacheModel(rdb, config) - offlinePusher := NewOfflinePusher(config, cacheModel) + cacheModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis) + offlinePusher := NewOfflinePusher(&config.Push, &config.IOSPush, cacheModel) database := controller.NewPushDatabase(cacheModel) - groupRpcClient := rpcclient.NewGroupRpcClient(client, config) - conversationRpcClient := rpcclient.NewConversationRpcClient(client, config) - msgRpcClient := rpcclient.NewMessageRpcClient(client, config) + groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName) + conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.RpcRegisterName.OpenImConversationName) + msgRpcClient := rpcclient.NewMessageRpcClient(client, config.RpcRegisterName.OpenImMsgName) pusher := NewPusher( config, client, @@ -61,10 +60,9 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg pbpush.RegisterPushMsgServiceServer(server, &pushServer{ pusher: pusher, - config: config, }) - consumer, err := NewConsumer(config, pusher) + consumer, err := NewConsumer(&config.Kafka, pusher) if err != nil { return err } diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index f6a7fc10a..b549bf91a 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -76,15 +76,15 @@ func NewPusher(config *config.GlobalConfig, discov discoveryregistry.SvcDiscover } } -func NewOfflinePusher(config *config.GlobalConfig, cache cache.MsgModel) offlinepush.OfflinePusher { +func NewOfflinePusher(pushConf *config.Push, iOSPushConf *config.IOSPush, cache cache.MsgModel) offlinepush.OfflinePusher { var offlinePusher offlinepush.OfflinePusher - switch config.Push.Enable { + switch pushConf.Enable { case "getui": - offlinePusher = getui.NewClient(config, cache) + offlinePusher = getui.NewClient(pushConf, cache) case "fcm": - offlinePusher = fcm.NewClient(config, cache) + offlinePusher = fcm.NewClient(pushConf, cache) case "jpush": - offlinePusher = jpush.NewClient(config) + offlinePusher = jpush.NewClient(pushConf, iOSPushConf) default: offlinePusher = dummy.NewClient() } @@ -102,7 +102,7 @@ func (p *Pusher) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error { log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String()) - if err := callbackOnlinePush(ctx, p.config, userIDs, msg); err != nil { + if err := callbackOnlinePush(ctx, &p.config.Callback, userIDs, msg); err != nil { return err } // push @@ -130,7 +130,7 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg }) if len(offlinePushUserIDList) > 0 { - if err = callbackOfflinePush(ctx, p.config, offlinePushUserIDList, msg, &[]string{}); err != nil { + if err = callbackOfflinePush(ctx, &p.config.Callback, offlinePushUserIDList, msg, &[]string{}); err != nil { return err } err = p.offlinePushMsg(ctx, msg.SendID, msg, offlinePushUserIDList) @@ -163,7 +163,7 @@ func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string, } if len(needOfflinePushUserIDs) > 0 { var offlinePushUserIDs []string - err := callbackOfflinePush(ctx, p.config, needOfflinePushUserIDs, msg, &offlinePushUserIDs) + err := callbackOfflinePush(ctx, &p.config.Callback, needOfflinePushUserIDs, msg, &offlinePushUserIDs) if err != nil { return err } @@ -194,7 +194,7 @@ func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string, func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) var pushToUserIDs []string - if err = callbackBeforeSuperGroupOnlinePush(ctx, p.config, groupID, msg, &pushToUserIDs); err != nil { + if err = callbackBeforeSuperGroupOnlinePush(ctx, &p.config.Callback, groupID, msg, &pushToUserIDs); err != nil { return err } @@ -299,7 +299,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws // Use offline push messaging if len(needOfflinePushUserIDs) > 0 { var offlinePushUserIDs []string - err = callbackOfflinePush(ctx, p.config, needOfflinePushUserIDs, msg, &offlinePushUserIDs) + err = callbackOfflinePush(ctx, &p.config.Callback, needOfflinePushUserIDs, msg, &offlinePushUserIDs) if err != nil { return err } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index f563af150..bb160492f 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -121,33 +121,33 @@ type CommonMsgDatabase interface { ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) } -func NewCommonMsgDatabase(msgDocModel unrelationtb.MsgDocModelInterface, cacheModel cache.MsgModel, config *config.GlobalConfig) (CommonMsgDatabase, error) { +func NewCommonMsgDatabase(msgDocModel unrelationtb.MsgDocModelInterface, cacheModel cache.MsgModel, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { producerConfig := &kafka.ProducerConfig{ - ProducerAck: config.Kafka.ProducerAck, - CompressType: config.Kafka.CompressType, - Username: config.Kafka.Username, - Password: config.Kafka.Password, + ProducerAck: kafkaConf.ProducerAck, + CompressType: kafkaConf.CompressType, + Username: kafkaConf.Username, + Password: kafkaConf.Password, } var tlsConfig *kafka.TLSConfig - if config.Kafka.TLS != nil { + if kafkaConf.TLS != nil { tlsConfig = &kafka.TLSConfig{ - CACrt: config.Kafka.TLS.CACrt, - ClientCrt: config.Kafka.TLS.ClientCrt, - ClientKey: config.Kafka.TLS.ClientKey, - ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd, + CACrt: kafkaConf.TLS.CACrt, + ClientCrt: kafkaConf.TLS.ClientCrt, + ClientKey: kafkaConf.TLS.ClientKey, + ClientKeyPwd: kafkaConf.TLS.ClientKeyPwd, InsecureSkipVerify: false, } } - producerToRedis, err := kafka.NewKafkaProducer(config.Kafka.Addr, config.Kafka.LatestMsgToRedis.Topic, producerConfig, tlsConfig) + producerToRedis, err := kafka.NewKafkaProducer(kafkaConf.Addr, kafkaConf.LatestMsgToRedis.Topic, producerConfig, tlsConfig) if err != nil { return nil, err } - producerToMongo, err := kafka.NewKafkaProducer(config.Kafka.Addr, config.Kafka.MsgToMongo.Topic, producerConfig, tlsConfig) + producerToMongo, err := kafka.NewKafkaProducer(kafkaConf.Addr, kafkaConf.MsgToMongo.Topic, producerConfig, tlsConfig) if err != nil { return nil, err } - producerToPush, err := kafka.NewKafkaProducer(config.Kafka.Addr, config.Kafka.MsgToPush.Topic, producerConfig, tlsConfig) + producerToPush, err := kafka.NewKafkaProducer(kafkaConf.Addr, kafkaConf.MsgToPush.Topic, producerConfig, tlsConfig) if err != nil { return nil, err } diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go index 7bc1ec1de..1a48561ac 100644 --- a/pkg/common/db/unrelation/mongo.go +++ b/pkg/common/db/unrelation/mongo.go @@ -37,14 +37,14 @@ const ( ) type Mongo struct { - db *mongo.Client - config *config.GlobalConfig + db *mongo.Client + mongoConf *config.Mongo } // NewMongo Initialize MongoDB connection. -func NewMongo(config *config.GlobalConfig) (*Mongo, error) { +func NewMongo(mongoConf *config.Mongo) (*Mongo, error) { specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound) - uri := buildMongoURI(config) + uri := buildMongoURI(mongoConf) var mongoClient *mongo.Client var err error @@ -58,7 +58,7 @@ func NewMongo(config *config.GlobalConfig) (*Mongo, error) { if err = mongoClient.Ping(ctx, nil); err != nil { return nil, errs.Wrap(err, uri) } - return &Mongo{db: mongoClient, config: config}, nil + return &Mongo{db: mongoClient, mongoConf: mongoConf}, nil } if shouldRetry(err) { time.Sleep(time.Second) // exponential backoff could be implemented here @@ -68,14 +68,14 @@ func NewMongo(config *config.GlobalConfig) (*Mongo, error) { return nil, errs.Wrap(err, uri) } -func buildMongoURI(config *config.GlobalConfig) string { +func buildMongoURI(mongoConf *config.Mongo) string { uri := os.Getenv("MONGO_URI") if uri != "" { return uri } - if config.Mongo.Uri != "" { - return config.Mongo.Uri + if mongoConf.Uri != "" { + return mongoConf.Uri } username := os.Getenv("MONGO_OPENIM_USERNAME") @@ -86,21 +86,21 @@ func buildMongoURI(config *config.GlobalConfig) string { maxPoolSize := os.Getenv("MONGO_MAX_POOL_SIZE") if username == "" { - username = config.Mongo.Username + username = mongoConf.Username } if password == "" { - password = config.Mongo.Password + password = mongoConf.Password } if address == "" { - address = strings.Join(config.Mongo.Address, ",") + address = strings.Join(mongoConf.Address, ",") } else if port != "" { address = fmt.Sprintf("%s:%s", address, port) } if database == "" { - database = config.Mongo.Database + database = mongoConf.Database } if maxPoolSize == "" { - maxPoolSize = fmt.Sprint(config.Mongo.MaxPoolSize) + maxPoolSize = fmt.Sprint(mongoConf.MaxPoolSize) } if username != "" && password != "" { @@ -134,7 +134,7 @@ func (m *Mongo) CreateMsgIndex() error { // createMongoIndex creates an index in a MongoDB collection. func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...string) error { - db := m.GetDatabase(m.config.Mongo.Database).Collection(collection) + db := m.GetDatabase(m.mongoConf.Database).Collection(collection) opts := options.CreateIndexes().SetMaxTime(10 * time.Second) indexView := db.Indexes() diff --git a/pkg/common/prommetrics/prommetrics.go b/pkg/common/prommetrics/prommetrics.go index 710339cca..6553eaaad 100644 --- a/pkg/common/prommetrics/prommetrics.go +++ b/pkg/common/prommetrics/prommetrics.go @@ -32,17 +32,17 @@ func NewGrpcPromObj(cusMetrics []prometheus.Collector) (*prometheus.Registry, *g return reg, grpcMetrics, nil } -func GetGrpcCusMetrics(registerName string, config *config2.GlobalConfig) []prometheus.Collector { +func GetGrpcCusMetrics(registerName string, rpcRegisterName *config2.RpcRegisterName) []prometheus.Collector { switch registerName { - case config.RpcRegisterName.OpenImMessageGatewayName: + case rpcRegisterName.OpenImMessageGatewayName: return []prometheus.Collector{OnlineUserGauge} - case config.RpcRegisterName.OpenImMsgName: + case rpcRegisterName.OpenImMsgName: return []prometheus.Collector{SingleChatMsgProcessSuccessCounter, SingleChatMsgProcessFailedCounter, GroupChatMsgProcessSuccessCounter, GroupChatMsgProcessFailedCounter} case "Transfer": return []prometheus.Collector{MsgInsertRedisSuccessCounter, MsgInsertRedisFailedCounter, MsgInsertMongoSuccessCounter, MsgInsertMongoFailedCounter, SeqSetFailedCounter} - case config.RpcRegisterName.OpenImPushName: + case rpcRegisterName.OpenImPushName: return []prometheus.Collector{MsgOfflinePushFailedCounter} - case config.RpcRegisterName.OpenImAuthName: + case rpcRegisterName.OpenImAuthName: return []prometheus.Collector{UserLoginCounter} default: return nil