From 79ee07a13f4c0f33c74e28d17fcef722806294a8 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Thu, 22 Feb 2024 21:36:43 +0800 Subject: [PATCH] optimization: config file changed to dependency injection. --- internal/msggateway/callback.go | 22 ++++------ internal/msggateway/hub_server.go | 4 +- internal/msggateway/init.go | 1 + internal/msggateway/n_ws_server.go | 14 +++--- internal/msgtransfer/init.go | 16 +++---- .../msgtransfer/online_history_msg_handler.go | 5 ++- .../online_msg_to_mongo_handler.go | 6 +-- internal/push/callback.go | 20 ++++----- internal/push/consumer_init.go | 5 ++- internal/push/offlinepush/fcm/push.go | 4 +- internal/push/offlinepush/getui/body.go | 6 +-- internal/push/offlinepush/getui/push.go | 17 ++++--- .../offlinepush/jpush/body/notification.go | 5 +-- internal/push/offlinepush/jpush/push.go | 16 ++++--- internal/push/push_handler.go | 6 +-- internal/push/push_rpc_server.go | 7 +-- internal/push/push_to_client.go | 44 ++++++++++--------- pkg/common/db/cache/init_redis.go | 21 ++++----- 18 files changed, 115 insertions(+), 104 deletions(-) diff --git a/internal/msggateway/callback.go b/internal/msggateway/callback.go index 7d5381754..660404ab4 100644 --- a/internal/msggateway/callback.go +++ b/internal/msggateway/callback.go @@ -26,12 +26,8 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/http" ) -func callBackURL() string { - return config.Config.Callback.CallbackUrl -} - -func CallbackUserOnline(ctx context.Context, userID string, platformID int, isAppBackground bool, connID string) error { - if !config.Config.Callback.CallbackUserOnline.Enable { +func CallbackUserOnline(ctx context.Context, globalConfig *config.GlobalConfig, userID string, platformID int, isAppBackground bool, connID string) error { + if !globalConfig.Callback.CallbackUserOnline.Enable { return nil } req := cbapi.CallbackUserOnlineReq{ @@ -49,14 +45,14 @@ func CallbackUserOnline(ctx context.Context, userID string, platformID int, isAp ConnID: connID, } resp := cbapi.CommonCallbackResp{} - if err := http.CallBackPostReturn(ctx, callBackURL(), &req, &resp, config.Config.Callback.CallbackUserOnline); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, &req, &resp, globalConfig.Callback.CallbackUserOnline); err != nil { return err } return nil } -func CallbackUserOffline(ctx context.Context, userID string, platformID int, connID string) error { - if !config.Config.Callback.CallbackUserOffline.Enable { +func CallbackUserOffline(ctx context.Context, globalConfig *config.GlobalConfig, userID string, platformID int, connID string) error { + if !globalConfig.Callback.CallbackUserOffline.Enable { return nil } req := &cbapi.CallbackUserOfflineReq{ @@ -73,14 +69,14 @@ func CallbackUserOffline(ctx context.Context, userID string, platformID int, con ConnID: connID, } resp := &cbapi.CallbackUserOfflineResp{} - if err := http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackUserOffline); err != nil { return err } return nil } -func CallbackUserKickOff(ctx context.Context, userID string, platformID int) error { - if !config.Config.Callback.CallbackUserKickOff.Enable { +func CallbackUserKickOff(ctx context.Context, globalConfig *config.GlobalConfig, userID string, platformID int) error { + if !globalConfig.Callback.CallbackUserKickOff.Enable { return nil } req := &cbapi.CallbackUserKickOffReq{ @@ -96,7 +92,7 @@ func CallbackUserKickOff(ctx context.Context, userID string, platformID int) err Seq: time.Now().UnixMilli(), } resp := &cbapi.CommonCallbackResp{} - if err := http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackUserOffline); err != nil { return err } return nil diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 97c98e8cd..3fa2b7dce 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -34,7 +34,7 @@ import ( ) func (s *Server) InitServer(config *config.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis() + rdb, err := cache.NewRedis(config) if err != nil { return err } @@ -49,7 +49,7 @@ func (s *Server) InitServer(config *config.GlobalConfig, disCov discoveryregistr func (s *Server) Start(conf *config.GlobalConfig) error { return startrpc.Start( s.rpcPort, - config.Config.RpcRegisterName.OpenImMessageGatewayName, + conf.RpcRegisterName.OpenImMessageGatewayName, s.prometheusPort, conf, s.InitServer, diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 36b6ea88e..ef5a8f81a 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -32,6 +32,7 @@ func RunWsAndServer(conf *config.GlobalConfig, rpcPort, wsPort, prometheusPort i config.Version, ) longServer, err := NewWsServer( + conf, WithPort(wsPort), WithMaxConnNum(int64(conf.LongConnSvr.WebsocketMaxConnNum)), WithHandshakeTimeout(time.Duration(conf.LongConnSvr.WebsocketTimeout)*time.Second), diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index c16da7c64..ffefb6e5d 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -68,6 +68,7 @@ var bufferPool = sync.Pool{ } type WsServer struct { + globalConfig *config.GlobalConfig port int wsMaxConnNum int64 registerChan chan *Client @@ -107,12 +108,12 @@ func (ws *WsServer) SetUserOnlineStatus(ctx context.Context, client *Client, sta } switch status { case constant.Online: - err := CallbackUserOnline(ctx, client.UserID, client.PlatformID, client.IsBackground, client.ctx.GetConnID()) + err := CallbackUserOnline(ctx, ws.globalConfig, client.UserID, client.PlatformID, client.IsBackground, client.ctx.GetConnID()) if err != nil { log.ZWarn(ctx, "CallbackUserOnline err", err) } case constant.Offline: - err := CallbackUserOffline(ctx, client.UserID, client.PlatformID, client.ctx.GetConnID()) + err := CallbackUserOffline(ctx, ws.globalConfig, client.UserID, client.PlatformID, client.ctx.GetConnID()) if err != nil { log.ZWarn(ctx, "CallbackUserOffline err", err) } @@ -140,13 +141,14 @@ func (ws *WsServer) GetUserPlatformCons(userID string, platform int) ([]*Client, return ws.clients.Get(userID, platform) } -func NewWsServer(opts ...Option) (*WsServer, error) { +func NewWsServer(globalConfig *config.GlobalConfig, opts ...Option) (*WsServer, error) { var config configs for _, o := range opts { o(&config) } v := validator.New() return &WsServer{ + globalConfig: globalConfig, port: config.port, wsMaxConnNum: config.maxConnNum, writeBufferSize: config.writeBufferSize, @@ -220,7 +222,7 @@ func (ws *WsServer) Run(done chan error) error { var concurrentRequest = 3 func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *Client) error { - conns, err := ws.disCov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName) + conns, err := ws.disCov.GetConns(ctx, ws.globalConfig.RpcRegisterName.OpenImMessageGatewayName) if err != nil { return err } @@ -285,7 +287,7 @@ func (ws *WsServer) registerClient(client *Client) { } wg := sync.WaitGroup{} - if config.Config.Envs.Discovery == "zookeeper" { + if ws.globalConfig.Envs.Discovery == "zookeeper" { wg.Add(1) go func() { defer wg.Done() @@ -328,7 +330,7 @@ func (ws *WsServer) KickUserConn(client *Client) error { } func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Client, newClient *Client) { - switch config.Config.MultiLoginPolicy { + switch ws.globalConfig.MultiLoginPolicy { case constant.DefalutNotKick: case constant.PCAndOther: if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC { diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 2493933f3..fb4e80b20 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -53,7 +53,7 @@ type MsgTransfer struct { } func StartTransfer(config *config.GlobalConfig, prometheusPort int) error { - rdb, err := cache.NewRedis() + rdb, err := cache.NewRedis(config) if err != nil { return err } @@ -83,19 +83,19 @@ func StartTransfer(config *config.GlobalConfig, prometheusPort int) error { } conversationRpcClient := rpcclient.NewConversationRpcClient(client) groupRpcClient := rpcclient.NewGroupRpcClient(client) - msgTransfer, err := NewMsgTransfer(msgDatabase, &conversationRpcClient, &groupRpcClient) + msgTransfer, err := NewMsgTransfer(config, msgDatabase, &conversationRpcClient, &groupRpcClient) if err != nil { return err } - return msgTransfer.Start(prometheusPort) + return msgTransfer.Start(prometheusPort, config) } -func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*MsgTransfer, error) { - historyCH, err := NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient, groupRpcClient) +func NewMsgTransfer(config *config.GlobalConfig, msgDatabase controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*MsgTransfer, error) { + historyCH, err := NewOnlineHistoryRedisConsumerHandler(config, msgDatabase, conversationRpcClient, groupRpcClient) if err != nil { return nil, err } - historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(msgDatabase) + historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(config, msgDatabase) if err != nil { return nil, err } @@ -106,7 +106,7 @@ func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcCli }, nil } -func (m *MsgTransfer) Start(prometheusPort int) error { +func (m *MsgTransfer) Start(prometheusPort int, config *config.GlobalConfig) error { fmt.Println("start msg transfer", "prometheusPort:", prometheusPort) if prometheusPort <= 0 { return errs.Wrap(errors.New("prometheusPort not correct")) @@ -124,7 +124,7 @@ func (m *MsgTransfer) Start(prometheusPort int) error { go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH, onError) go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH, onError) - if config.Config.Prometheus.Enable { + if config.Prometheus.Enable { go func() { proreg := prometheus.NewRegistry() proreg.MustRegister( diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 6678715d4..43bb6a816 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -85,6 +85,7 @@ type OnlineHistoryRedisConsumerHandler struct { } func NewOnlineHistoryRedisConsumerHandler( + config *config.GlobalConfig, database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, @@ -103,8 +104,8 @@ func NewOnlineHistoryRedisConsumerHandler( och.historyConsumerGroup, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, - }, []string{config.Config.Kafka.LatestMsgToRedis.Topic}, - config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis) + }, []string{config.Kafka.LatestMsgToRedis.Topic}, + config.Kafka.Addr, config.Kafka.ConsumerGroupID.MsgToRedis) // statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d // second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) return &och, err diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 6e6c4c819..783140601 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -34,12 +34,12 @@ type OnlineHistoryMongoConsumerHandler struct { msgDatabase controller.CommonMsgDatabase } -func NewOnlineHistoryMongoConsumerHandler(database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) { +func NewOnlineHistoryMongoConsumerHandler(config *config.GlobalConfig, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) { historyConsumerGroup, err := kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{ KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, - }, []string{config.Config.Kafka.MsgToMongo.Topic}, - config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo) + }, []string{config.Kafka.MsgToMongo.Topic}, + config.Kafka.Addr, config.Kafka.ConsumerGroupID.MsgToMongo) if err != nil { return nil, err } diff --git a/internal/push/callback.go b/internal/push/callback.go index a572fa572..4debee19e 100644 --- a/internal/push/callback.go +++ b/internal/push/callback.go @@ -27,17 +27,14 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/http" ) -func url() string { - return config.Config.Callback.CallbackUrl -} - func callbackOfflinePush( ctx context.Context, + config *config.GlobalConfig, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string, ) error { - if !config.Config.Callback.CallbackOfflinePush.Enable || msg.ContentType == constant.Typing { + if !config.Callback.CallbackOfflinePush.Enable || msg.ContentType == constant.Typing { return nil } req := &callbackstruct.CallbackBeforePushReq{ @@ -60,7 +57,7 @@ func callbackOfflinePush( Content: GetContent(msg), } resp := &callbackstruct.CallbackBeforePushResp{} - if err := http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackOfflinePush); err != nil { + if err := http.CallBackPostReturn(ctx, config.Callback.CallbackUrl, req, resp, config.Callback.CallbackOfflinePush); err != nil { return err } if len(resp.UserIDs) != 0 { @@ -72,8 +69,8 @@ func callbackOfflinePush( return nil } -func callbackOnlinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error { - if !config.Config.Callback.CallbackOnlinePush.Enable || utils.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing { +func callbackOnlinePush(ctx context.Context, config *config.GlobalConfig, userIDs []string, msg *sdkws.MsgData) error { + if !config.Callback.CallbackOnlinePush.Enable || utils.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing { return nil } req := callbackstruct.CallbackBeforePushReq{ @@ -95,7 +92,7 @@ func callbackOnlinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgDat Content: GetContent(msg), } resp := &callbackstruct.CallbackBeforePushResp{} - if err := http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackOnlinePush); err != nil { + if err := http.CallBackPostReturn(ctx, config.Callback.CallbackUrl, req, resp, config.Callback.CallbackOnlinePush); err != nil { return err } return nil @@ -103,11 +100,12 @@ func callbackOnlinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgDat func callbackBeforeSuperGroupOnlinePush( ctx context.Context, + config *config.GlobalConfig, groupID string, msg *sdkws.MsgData, pushToUserIDs *[]string, ) error { - if !config.Config.Callback.CallbackBeforeSuperGroupOnlinePush.Enable || msg.ContentType == constant.Typing { + if !config.Callback.CallbackBeforeSuperGroupOnlinePush.Enable || msg.ContentType == constant.Typing { return nil } req := callbackstruct.CallbackBeforeSuperGroupOnlinePushReq{ @@ -127,7 +125,7 @@ func callbackBeforeSuperGroupOnlinePush( Seq: msg.Seq, } resp := &callbackstruct.CallbackBeforeSuperGroupOnlinePushResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSuperGroupOnlinePush); err != nil { + if err := http.CallBackPostReturn(ctx, config.Callback.CallbackUrl, req, resp, config.Callback.CallbackBeforeSuperGroupOnlinePush); err != nil { return err } diff --git a/internal/push/consumer_init.go b/internal/push/consumer_init.go index 572afe0eb..6e6a2e898 100644 --- a/internal/push/consumer_init.go +++ b/internal/push/consumer_init.go @@ -16,6 +16,7 @@ package push import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/OpenIMSDK/tools/log" ) @@ -25,8 +26,8 @@ type Consumer struct { successCount uint64 } -func NewConsumer(pusher *Pusher) (*Consumer, error) { - c, err := NewConsumerHandler(pusher) +func NewConsumer(config *config.GlobalConfig, pusher *Pusher) (*Consumer, error) { + c, err := NewConsumerHandler(config, pusher) if err != nil { return nil, err } diff --git a/internal/push/offlinepush/fcm/push.go b/internal/push/offlinepush/fcm/push.go index 8145d4c17..4f82765b5 100644 --- a/internal/push/offlinepush/fcm/push.go +++ b/internal/push/offlinepush/fcm/push.go @@ -39,9 +39,9 @@ type Fcm struct { cache cache.MsgModel } -func NewClient(cache cache.MsgModel) *Fcm { +func NewClient(globalConfig *config.GlobalConfig, cache cache.MsgModel) *Fcm { projectRoot := config.GetProjectRoot() - credentialsFilePath := filepath.Join(projectRoot, "config", config.Config.Push.Fcm.ServiceAccount) + credentialsFilePath := filepath.Join(projectRoot, "config", globalConfig.Push.Fcm.ServiceAccount) opt := option.WithCredentialsFile(credentialsFilePath) fcmApp, err := firebase.NewApp(context.Background(), nil, opt) if err != nil { diff --git a/internal/push/offlinepush/getui/body.go b/internal/push/offlinepush/getui/body.go index 01eb22e73..46479163f 100644 --- a/internal/push/offlinepush/getui/body.go +++ b/internal/push/offlinepush/getui/body.go @@ -133,13 +133,13 @@ type Payload struct { IsSignal bool `json:"isSignal"` } -func newPushReq(title, content string) PushReq { +func newPushReq(config *config.GlobalConfig, title, content string) PushReq { pushReq := PushReq{PushMessage: &PushMessage{Notification: &Notification{ Title: title, Body: content, ClickType: "startapp", - ChannelID: config.Config.Push.GeTui.ChannelID, - ChannelName: config.Config.Push.GeTui.ChannelName, + ChannelID: config.Push.GeTui.ChannelID, + ChannelName: config.Push.GeTui.ChannelName, }}} return pushReq } diff --git a/internal/push/offlinepush/getui/push.go b/internal/push/offlinepush/getui/push.go index b657c9c23..288d87082 100644 --- a/internal/push/offlinepush/getui/push.go +++ b/internal/push/offlinepush/getui/push.go @@ -59,10 +59,15 @@ type Client struct { cache cache.MsgModel tokenExpireTime int64 taskIDTTL int64 + config *config.GlobalConfig } -func NewClient(cache cache.MsgModel) *Client { - return &Client{cache: cache, tokenExpireTime: tokenExpireTime, taskIDTTL: taskIDTTL} +func NewClient(config *config.GlobalConfig, cache cache.MsgModel) *Client { + return &Client{cache: cache, + tokenExpireTime: tokenExpireTime, + taskIDTTL: taskIDTTL, + config: config, + } } func (g *Client) Push(ctx context.Context, userIDs []string, title, content string, opts *offlinepush.Opts) error { @@ -78,7 +83,7 @@ func (g *Client) Push(ctx context.Context, userIDs []string, title, content stri return err } } - pushReq := newPushReq(title, content) + pushReq := newPushReq(g.config, title, content) pushReq.setPushChannel(title, content) if len(userIDs) > 1 { maxNum := 999 @@ -114,13 +119,13 @@ func (g *Client) Push(ctx context.Context, userIDs []string, title, content stri func (g *Client) Auth(ctx context.Context, timeStamp int64) (token string, expireTime int64, err error) { h := sha256.New() h.Write( - []byte(config.Config.Push.GeTui.AppKey + strconv.Itoa(int(timeStamp)) + config.Config.Push.GeTui.MasterSecret), + []byte(g.config.Push.GeTui.AppKey + strconv.Itoa(int(timeStamp)) + g.config.Push.GeTui.MasterSecret), ) sign := hex.EncodeToString(h.Sum(nil)) reqAuth := AuthReq{ Sign: sign, Timestamp: strconv.Itoa(int(timeStamp)), - AppKey: config.Config.Push.GeTui.AppKey, + AppKey: g.config.Push.GeTui.AppKey, } respAuth := AuthResp{} err = g.request(ctx, authURL, reqAuth, "", &respAuth) @@ -163,7 +168,7 @@ func (g *Client) request(ctx context.Context, url string, input any, token strin header := map[string]string{"token": token} resp := &Resp{} resp.Data = output - return g.postReturn(ctx, config.Config.Push.GeTui.PushUrl+url, header, input, resp, 3) + return g.postReturn(ctx, g.config.Push.GeTui.PushUrl+url, header, input, resp, 3) } func (g *Client) postReturn( diff --git a/internal/push/offlinepush/jpush/body/notification.go b/internal/push/offlinepush/jpush/body/notification.go index ddf3802af..b25882ea5 100644 --- a/internal/push/offlinepush/jpush/body/notification.go +++ b/internal/push/offlinepush/jpush/body/notification.go @@ -46,7 +46,6 @@ type Extras struct { func (n *Notification) SetAlert(alert string) { n.Alert = alert n.Android.Alert = alert - n.SetAndroidIntent() n.IOS.Alert = alert n.IOS.Sound = "default" n.IOS.Badge = "+1" @@ -57,8 +56,8 @@ func (n *Notification) SetExtras(extras Extras) { n.Android.Extras = extras } -func (n *Notification) SetAndroidIntent() { - n.Android.Intent.URL = config.Config.Push.Jpns.PushIntent +func (n *Notification) SetAndroidIntent(config *config.GlobalConfig) { + n.Android.Intent.URL = config.Push.Jpns.PushIntent } func (n *Notification) IOSEnableMutableContent() { diff --git a/internal/push/offlinepush/jpush/push.go b/internal/push/offlinepush/jpush/push.go index 567269f3c..2ced4bfd3 100644 --- a/internal/push/offlinepush/jpush/push.go +++ b/internal/push/offlinepush/jpush/push.go @@ -25,10 +25,12 @@ import ( http2 "github.com/openimsdk/open-im-server/v3/pkg/common/http" ) -type JPush struct{} +type JPush struct { + config *config.GlobalConfig +} -func NewClient() *JPush { - return &JPush{} +func NewClient(config *config.GlobalConfig) *JPush { + return &JPush{config: config} } func (j *JPush) Auth(apiKey, secretKey string, timeStamp int64) (token string, err error) { @@ -59,10 +61,12 @@ func (j *JPush) Push(ctx context.Context, userIDs []string, title, content strin no.IOSEnableMutableContent() no.SetExtras(extras) no.SetAlert(title) + no.SetAndroidIntent(j.config) + var msg body.Message msg.SetMsgContent(content) var opt body.Options - opt.SetApnsProduction(config.Config.IOSPush.Production) + opt.SetApnsProduction(j.config.IOSPush.Production) var pushObj body.PushObj pushObj.SetPlatform(&pf) pushObj.SetAudience(&au) @@ -76,9 +80,9 @@ func (j *JPush) Push(ctx context.Context, userIDs []string, title, content strin func (j *JPush) request(ctx context.Context, po body.PushObj, resp any, timeout int) error { return http2.PostReturn( ctx, - config.Config.Push.Jpns.PushUrl, + j.config.Push.Jpns.PushUrl, map[string]string{ - "Authorization": j.getAuthorization(config.Config.Push.Jpns.AppKey, config.Config.Push.Jpns.MasterSecret), + "Authorization": j.getAuthorization(j.config.Push.Jpns.AppKey, j.config.Push.Jpns.MasterSecret), }, po, resp, diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 19d42ebb9..123786a83 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -35,15 +35,15 @@ type ConsumerHandler struct { pusher *Pusher } -func NewConsumerHandler(pusher *Pusher) (*ConsumerHandler, error) { +func NewConsumerHandler(config *config.GlobalConfig, pusher *Pusher) (*ConsumerHandler, error) { var consumerHandler ConsumerHandler consumerHandler.pusher = pusher var err error consumerHandler.pushConsumerGroup, err = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{ KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, - }, []string{config.Config.Kafka.MsgToPush.Topic}, config.Config.Kafka.Addr, - config.Config.Kafka.ConsumerGroupID.MsgToPush) + }, []string{config.Kafka.MsgToPush.Topic}, config.Kafka.Addr, + config.Kafka.ConsumerGroupID.MsgToPush) if err != nil { return nil, err } diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index c7ad3fba8..88778aea9 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -39,17 +39,18 @@ type pushServer struct { } func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis() + rdb, err := cache.NewRedis(config) if err != nil { return err } cacheModel := cache.NewMsgCacheModel(rdb) - offlinePusher := NewOfflinePusher(cacheModel) + offlinePusher := NewOfflinePusher(config, cacheModel) database := controller.NewPushDatabase(cacheModel) groupRpcClient := rpcclient.NewGroupRpcClient(client) conversationRpcClient := rpcclient.NewConversationRpcClient(client) msgRpcClient := rpcclient.NewMessageRpcClient(client) pusher := NewPusher( + config, client, offlinePusher, database, @@ -65,7 +66,7 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg config: config, }) - consumer, err := NewConsumer(pusher) + consumer, err := NewConsumer(config, pusher) if err != nil { return err } diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 5fce34e83..7c26dc54b 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -48,6 +48,7 @@ import ( ) type Pusher struct { + config *config.GlobalConfig database controller.PushDatabase discov discoveryregistry.SvcDiscoveryRegistry offlinePusher offlinepush.OfflinePusher @@ -60,11 +61,12 @@ type Pusher struct { var errNoOfflinePusher = errors.New("no offlinePusher is configured") -func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, +func NewPusher(config *config.GlobalConfig, discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient, ) *Pusher { return &Pusher{ + config: config, discov: discov, database: database, offlinePusher: offlinePusher, @@ -76,15 +78,15 @@ func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offl } } -func NewOfflinePusher(cache cache.MsgModel) offlinepush.OfflinePusher { +func NewOfflinePusher(config *config.GlobalConfig, cache cache.MsgModel) offlinepush.OfflinePusher { var offlinePusher offlinepush.OfflinePusher - switch config.Config.Push.Enable { + switch config.Push.Enable { case "getui": - offlinePusher = getui.NewClient(cache) + offlinePusher = getui.NewClient(config, cache) case "fcm": - offlinePusher = fcm.NewClient(cache) + offlinePusher = fcm.NewClient(config, cache) case "jpush": - offlinePusher = jpush.NewClient() + offlinePusher = jpush.NewClient(config) default: offlinePusher = dummy.NewClient() } @@ -102,7 +104,7 @@ func (p *Pusher) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error { log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String()) - if err := callbackOnlinePush(ctx, userIDs, msg); err != nil { + if err := callbackOnlinePush(ctx, p.config, userIDs, msg); err != nil { return err } // push @@ -130,7 +132,7 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg }) if len(offlinePushUserIDList) > 0 { - if err = callbackOfflinePush(ctx, offlinePushUserIDList, msg, &[]string{}); err != nil { + if err = callbackOfflinePush(ctx, p.config, offlinePushUserIDList, msg, &[]string{}); err != nil { return err } err = p.offlinePushMsg(ctx, msg.SendID, msg, offlinePushUserIDList) @@ -163,7 +165,7 @@ func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string, } if len(needOfflinePushUserIDs) > 0 { var offlinePushUserIDs []string - err := callbackOfflinePush(ctx, needOfflinePushUserIDs, msg, &offlinePushUserIDs) + err := callbackOfflinePush(ctx, p.config, needOfflinePushUserIDs, msg, &offlinePushUserIDs) if err != nil { return err } @@ -194,7 +196,7 @@ func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string, func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) var pushToUserIDs []string - if err = callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil { + if err = callbackBeforeSuperGroupOnlinePush(ctx, p.config, groupID, msg, &pushToUserIDs); err != nil { return err } @@ -235,11 +237,11 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws return err } log.ZInfo(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(pushToUserIDs), "list", pushToUserIDs) - if len(config.Config.Manager.UserID) > 0 { - ctx = mcontext.WithOpUserIDContext(ctx, config.Config.Manager.UserID[0]) + if len(p.config.Manager.UserID) > 0 { + ctx = mcontext.WithOpUserIDContext(ctx, p.config.Manager.UserID[0]) } - if len(config.Config.Manager.UserID) == 0 && len(config.Config.IMAdmin.UserID) > 0 { - ctx = mcontext.WithOpUserIDContext(ctx, config.Config.IMAdmin.UserID[0]) + if len(p.config.Manager.UserID) == 0 && len(p.config.IMAdmin.UserID) > 0 { + ctx = mcontext.WithOpUserIDContext(ctx, p.config.IMAdmin.UserID[0]) } defer func(groupID string) { if err = p.groupRpcClient.DismissGroup(ctx, groupID); err != nil { @@ -257,10 +259,10 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg) isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) - if isOfflinePush && config.Config.Envs.Discovery == "k8s" { + if isOfflinePush && p.config.Envs.Discovery == "k8s" { return p.k8sOfflinePush2SuperGroup(ctx, groupID, msg, wsResults) } - if isOfflinePush && config.Config.Envs.Discovery == "zookeeper" { + if isOfflinePush && p.config.Envs.Discovery == "zookeeper" { var ( onlineSuccessUserIDs = []string{msg.SendID} webAndPcBackgroundUserIDs []string @@ -298,7 +300,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws // Use offline push messaging if len(needOfflinePushUserIDs) > 0 { var offlinePushUserIDs []string - err = callbackOfflinePush(ctx, needOfflinePushUserIDs, msg, &offlinePushUserIDs) + err = callbackOfflinePush(ctx, p.config, needOfflinePushUserIDs, msg, &offlinePushUserIDs) if err != nil { return err } @@ -357,7 +359,7 @@ func (p *Pusher) k8sOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUs var ( mu sync.Mutex wg = errgroup.Group{} - maxWorkers = config.Config.Push.MaxConcurrentWorkers + maxWorkers = p.config.Push.MaxConcurrentWorkers ) if maxWorkers < 3 { maxWorkers = 3 @@ -386,10 +388,10 @@ func (p *Pusher) k8sOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUs return wsResults, nil } func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { - if config.Config.Envs.Discovery == "k8s" { + if p.config.Envs.Discovery == "k8s" { return p.k8sOnlinePush(ctx, msg, pushToUserIDs) } - conns, err := p.discov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName) + conns, err := p.discov.GetConns(ctx, p.config.RpcRegisterName.OpenImMessageGatewayName) log.ZDebug(ctx, "get gateway conn", "conn length", len(conns)) if err != nil { return nil, err @@ -399,7 +401,7 @@ func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, mu sync.Mutex wg = errgroup.Group{} input = &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs} - maxWorkers = config.Config.Push.MaxConcurrentWorkers + maxWorkers = p.config.Push.MaxConcurrentWorkers ) if maxWorkers < 3 { diff --git a/pkg/common/db/cache/init_redis.go b/pkg/common/db/cache/init_redis.go index 3cec73be5..a8189fdbc 100644 --- a/pkg/common/db/cache/init_redis.go +++ b/pkg/common/db/cache/init_redis.go @@ -40,7 +40,7 @@ const ( ) // NewRedis Initialize redis connection. -func NewRedis() (redis.UniversalClient, error) { +func NewRedis(config *config.GlobalConfig) (redis.UniversalClient, error) { if redisClient != nil { return redisClient, nil } @@ -48,24 +48,24 @@ func NewRedis() (redis.UniversalClient, error) { // Read configuration from environment variables overrideConfigFromEnv() - if len(config.Config.Redis.Address) == 0 { + if len(config.Redis.Address) == 0 { return nil, errs.Wrap(errors.New("redis address is empty")) } specialerror.AddReplace(redis.Nil, errs.ErrRecordNotFound) var rdb redis.UniversalClient - if len(config.Config.Redis.Address) > 1 || config.Config.Redis.ClusterMode { + if len(config.Redis.Address) > 1 || config.Redis.ClusterMode { rdb = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: config.Config.Redis.Address, - Username: config.Config.Redis.Username, - Password: config.Config.Redis.Password, // no password set + Addrs: config.Redis.Address, + Username: config.Redis.Username, + Password: config.Redis.Password, // no password set PoolSize: 50, MaxRetries: maxRetry, }) } else { rdb = redis.NewClient(&redis.Options{ - Addr: config.Config.Redis.Address[0], - Username: config.Config.Redis.Username, - Password: config.Config.Redis.Password, + Addr: config.Redis.Address[0], + Username: config.Redis.Username, + Password: config.Redis.Password, DB: 0, // use default DB PoolSize: 100, // connection pool size MaxRetries: maxRetry, @@ -78,7 +78,8 @@ func NewRedis() (redis.UniversalClient, error) { err = rdb.Ping(ctx).Err() if err != nil { uriFormat := "address:%s, username:%s, password:%s, clusterMode:%t, enablePipeline:%t" - errMsg := fmt.Sprintf(uriFormat, config.Config.Redis.Address, config.Config.Redis.Username, config.Config.Redis.Password, config.Config.Redis.ClusterMode, config.Config.Redis.EnablePipeline) + errMsg := fmt.Sprintf(uriFormat, config.Redis.Address, config.Redis.Username, + config.Redis.Password, config.Redis.ClusterMode, config.Redis.EnablePipeline) return nil, errs.Wrap(err, errMsg) } redisClient = rdb