optimization: config file changed to dependency injection.

pull/1960/head
Gordon 2 years ago
parent d0b16a6c16
commit 79ee07a13f

@ -26,12 +26,8 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/http"
)
func callBackURL() string {
return config.Config.Callback.CallbackUrl
}
func CallbackUserOnline(ctx context.Context, userID string, platformID int, isAppBackground bool, connID string) error {
if !config.Config.Callback.CallbackUserOnline.Enable {
func CallbackUserOnline(ctx context.Context, globalConfig *config.GlobalConfig, userID string, platformID int, isAppBackground bool, connID string) error {
if !globalConfig.Callback.CallbackUserOnline.Enable {
return nil
}
req := cbapi.CallbackUserOnlineReq{
@ -49,14 +45,14 @@ func CallbackUserOnline(ctx context.Context, userID string, platformID int, isAp
ConnID: connID,
}
resp := cbapi.CommonCallbackResp{}
if err := http.CallBackPostReturn(ctx, callBackURL(), &req, &resp, config.Config.Callback.CallbackUserOnline); err != nil {
if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, &req, &resp, globalConfig.Callback.CallbackUserOnline); err != nil {
return err
}
return nil
}
func CallbackUserOffline(ctx context.Context, userID string, platformID int, connID string) error {
if !config.Config.Callback.CallbackUserOffline.Enable {
func CallbackUserOffline(ctx context.Context, globalConfig *config.GlobalConfig, userID string, platformID int, connID string) error {
if !globalConfig.Callback.CallbackUserOffline.Enable {
return nil
}
req := &cbapi.CallbackUserOfflineReq{
@ -73,14 +69,14 @@ func CallbackUserOffline(ctx context.Context, userID string, platformID int, con
ConnID: connID,
}
resp := &cbapi.CallbackUserOfflineResp{}
if err := http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline); err != nil {
if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackUserOffline); err != nil {
return err
}
return nil
}
func CallbackUserKickOff(ctx context.Context, userID string, platformID int) error {
if !config.Config.Callback.CallbackUserKickOff.Enable {
func CallbackUserKickOff(ctx context.Context, globalConfig *config.GlobalConfig, userID string, platformID int) error {
if !globalConfig.Callback.CallbackUserKickOff.Enable {
return nil
}
req := &cbapi.CallbackUserKickOffReq{
@ -96,7 +92,7 @@ func CallbackUserKickOff(ctx context.Context, userID string, platformID int) err
Seq: time.Now().UnixMilli(),
}
resp := &cbapi.CommonCallbackResp{}
if err := http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline); err != nil {
if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackUserOffline); err != nil {
return err
}
return nil

@ -34,7 +34,7 @@ import (
)
func (s *Server) InitServer(config *config.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis()
rdb, err := cache.NewRedis(config)
if err != nil {
return err
}
@ -49,7 +49,7 @@ func (s *Server) InitServer(config *config.GlobalConfig, disCov discoveryregistr
func (s *Server) Start(conf *config.GlobalConfig) error {
return startrpc.Start(
s.rpcPort,
config.Config.RpcRegisterName.OpenImMessageGatewayName,
conf.RpcRegisterName.OpenImMessageGatewayName,
s.prometheusPort,
conf,
s.InitServer,

@ -32,6 +32,7 @@ func RunWsAndServer(conf *config.GlobalConfig, rpcPort, wsPort, prometheusPort i
config.Version,
)
longServer, err := NewWsServer(
conf,
WithPort(wsPort),
WithMaxConnNum(int64(conf.LongConnSvr.WebsocketMaxConnNum)),
WithHandshakeTimeout(time.Duration(conf.LongConnSvr.WebsocketTimeout)*time.Second),

@ -68,6 +68,7 @@ var bufferPool = sync.Pool{
}
type WsServer struct {
globalConfig *config.GlobalConfig
port int
wsMaxConnNum int64
registerChan chan *Client
@ -107,12 +108,12 @@ func (ws *WsServer) SetUserOnlineStatus(ctx context.Context, client *Client, sta
}
switch status {
case constant.Online:
err := CallbackUserOnline(ctx, client.UserID, client.PlatformID, client.IsBackground, client.ctx.GetConnID())
err := CallbackUserOnline(ctx, ws.globalConfig, client.UserID, client.PlatformID, client.IsBackground, client.ctx.GetConnID())
if err != nil {
log.ZWarn(ctx, "CallbackUserOnline err", err)
}
case constant.Offline:
err := CallbackUserOffline(ctx, client.UserID, client.PlatformID, client.ctx.GetConnID())
err := CallbackUserOffline(ctx, ws.globalConfig, client.UserID, client.PlatformID, client.ctx.GetConnID())
if err != nil {
log.ZWarn(ctx, "CallbackUserOffline err", err)
}
@ -140,13 +141,14 @@ func (ws *WsServer) GetUserPlatformCons(userID string, platform int) ([]*Client,
return ws.clients.Get(userID, platform)
}
func NewWsServer(opts ...Option) (*WsServer, error) {
func NewWsServer(globalConfig *config.GlobalConfig, opts ...Option) (*WsServer, error) {
var config configs
for _, o := range opts {
o(&config)
}
v := validator.New()
return &WsServer{
globalConfig: globalConfig,
port: config.port,
wsMaxConnNum: config.maxConnNum,
writeBufferSize: config.writeBufferSize,
@ -220,7 +222,7 @@ func (ws *WsServer) Run(done chan error) error {
var concurrentRequest = 3
func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *Client) error {
conns, err := ws.disCov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName)
conns, err := ws.disCov.GetConns(ctx, ws.globalConfig.RpcRegisterName.OpenImMessageGatewayName)
if err != nil {
return err
}
@ -285,7 +287,7 @@ func (ws *WsServer) registerClient(client *Client) {
}
wg := sync.WaitGroup{}
if config.Config.Envs.Discovery == "zookeeper" {
if ws.globalConfig.Envs.Discovery == "zookeeper" {
wg.Add(1)
go func() {
defer wg.Done()
@ -328,7 +330,7 @@ func (ws *WsServer) KickUserConn(client *Client) error {
}
func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Client, newClient *Client) {
switch config.Config.MultiLoginPolicy {
switch ws.globalConfig.MultiLoginPolicy {
case constant.DefalutNotKick:
case constant.PCAndOther:
if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC {

@ -53,7 +53,7 @@ type MsgTransfer struct {
}
func StartTransfer(config *config.GlobalConfig, prometheusPort int) error {
rdb, err := cache.NewRedis()
rdb, err := cache.NewRedis(config)
if err != nil {
return err
}
@ -83,19 +83,19 @@ func StartTransfer(config *config.GlobalConfig, prometheusPort int) error {
}
conversationRpcClient := rpcclient.NewConversationRpcClient(client)
groupRpcClient := rpcclient.NewGroupRpcClient(client)
msgTransfer, err := NewMsgTransfer(msgDatabase, &conversationRpcClient, &groupRpcClient)
msgTransfer, err := NewMsgTransfer(config, msgDatabase, &conversationRpcClient, &groupRpcClient)
if err != nil {
return err
}
return msgTransfer.Start(prometheusPort)
return msgTransfer.Start(prometheusPort, config)
}
func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*MsgTransfer, error) {
historyCH, err := NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient, groupRpcClient)
func NewMsgTransfer(config *config.GlobalConfig, msgDatabase controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*MsgTransfer, error) {
historyCH, err := NewOnlineHistoryRedisConsumerHandler(config, msgDatabase, conversationRpcClient, groupRpcClient)
if err != nil {
return nil, err
}
historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(msgDatabase)
historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(config, msgDatabase)
if err != nil {
return nil, err
}
@ -106,7 +106,7 @@ func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcCli
}, nil
}
func (m *MsgTransfer) Start(prometheusPort int) error {
func (m *MsgTransfer) Start(prometheusPort int, config *config.GlobalConfig) error {
fmt.Println("start msg transfer", "prometheusPort:", prometheusPort)
if prometheusPort <= 0 {
return errs.Wrap(errors.New("prometheusPort not correct"))
@ -124,7 +124,7 @@ func (m *MsgTransfer) Start(prometheusPort int) error {
go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH, onError)
go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH, onError)
if config.Config.Prometheus.Enable {
if config.Prometheus.Enable {
go func() {
proreg := prometheus.NewRegistry()
proreg.MustRegister(

@ -85,6 +85,7 @@ type OnlineHistoryRedisConsumerHandler struct {
}
func NewOnlineHistoryRedisConsumerHandler(
config *config.GlobalConfig,
database controller.CommonMsgDatabase,
conversationRpcClient *rpcclient.ConversationRpcClient,
groupRpcClient *rpcclient.GroupRpcClient,
@ -103,8 +104,8 @@ func NewOnlineHistoryRedisConsumerHandler(
och.historyConsumerGroup, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false,
}, []string{config.Config.Kafka.LatestMsgToRedis.Topic},
config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis)
}, []string{config.Kafka.LatestMsgToRedis.Topic},
config.Kafka.Addr, config.Kafka.ConsumerGroupID.MsgToRedis)
// statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d
// second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
return &och, err

@ -34,12 +34,12 @@ type OnlineHistoryMongoConsumerHandler struct {
msgDatabase controller.CommonMsgDatabase
}
func NewOnlineHistoryMongoConsumerHandler(database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
func NewOnlineHistoryMongoConsumerHandler(config *config.GlobalConfig, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
historyConsumerGroup, err := kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false,
}, []string{config.Config.Kafka.MsgToMongo.Topic},
config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo)
}, []string{config.Kafka.MsgToMongo.Topic},
config.Kafka.Addr, config.Kafka.ConsumerGroupID.MsgToMongo)
if err != nil {
return nil, err
}

@ -27,17 +27,14 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/http"
)
func url() string {
return config.Config.Callback.CallbackUrl
}
func callbackOfflinePush(
ctx context.Context,
config *config.GlobalConfig,
userIDs []string,
msg *sdkws.MsgData,
offlinePushUserIDs *[]string,
) error {
if !config.Config.Callback.CallbackOfflinePush.Enable || msg.ContentType == constant.Typing {
if !config.Callback.CallbackOfflinePush.Enable || msg.ContentType == constant.Typing {
return nil
}
req := &callbackstruct.CallbackBeforePushReq{
@ -60,7 +57,7 @@ func callbackOfflinePush(
Content: GetContent(msg),
}
resp := &callbackstruct.CallbackBeforePushResp{}
if err := http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackOfflinePush); err != nil {
if err := http.CallBackPostReturn(ctx, config.Callback.CallbackUrl, req, resp, config.Callback.CallbackOfflinePush); err != nil {
return err
}
if len(resp.UserIDs) != 0 {
@ -72,8 +69,8 @@ func callbackOfflinePush(
return nil
}
func callbackOnlinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error {
if !config.Config.Callback.CallbackOnlinePush.Enable || utils.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing {
func callbackOnlinePush(ctx context.Context, config *config.GlobalConfig, userIDs []string, msg *sdkws.MsgData) error {
if !config.Callback.CallbackOnlinePush.Enable || utils.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing {
return nil
}
req := callbackstruct.CallbackBeforePushReq{
@ -95,7 +92,7 @@ func callbackOnlinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgDat
Content: GetContent(msg),
}
resp := &callbackstruct.CallbackBeforePushResp{}
if err := http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackOnlinePush); err != nil {
if err := http.CallBackPostReturn(ctx, config.Callback.CallbackUrl, req, resp, config.Callback.CallbackOnlinePush); err != nil {
return err
}
return nil
@ -103,11 +100,12 @@ func callbackOnlinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgDat
func callbackBeforeSuperGroupOnlinePush(
ctx context.Context,
config *config.GlobalConfig,
groupID string,
msg *sdkws.MsgData,
pushToUserIDs *[]string,
) error {
if !config.Config.Callback.CallbackBeforeSuperGroupOnlinePush.Enable || msg.ContentType == constant.Typing {
if !config.Callback.CallbackBeforeSuperGroupOnlinePush.Enable || msg.ContentType == constant.Typing {
return nil
}
req := callbackstruct.CallbackBeforeSuperGroupOnlinePushReq{
@ -127,7 +125,7 @@ func callbackBeforeSuperGroupOnlinePush(
Seq: msg.Seq,
}
resp := &callbackstruct.CallbackBeforeSuperGroupOnlinePushResp{}
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSuperGroupOnlinePush); err != nil {
if err := http.CallBackPostReturn(ctx, config.Callback.CallbackUrl, req, resp, config.Callback.CallbackBeforeSuperGroupOnlinePush); err != nil {
return err
}

@ -16,6 +16,7 @@ package push
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/OpenIMSDK/tools/log"
)
@ -25,8 +26,8 @@ type Consumer struct {
successCount uint64
}
func NewConsumer(pusher *Pusher) (*Consumer, error) {
c, err := NewConsumerHandler(pusher)
func NewConsumer(config *config.GlobalConfig, pusher *Pusher) (*Consumer, error) {
c, err := NewConsumerHandler(config, pusher)
if err != nil {
return nil, err
}

@ -39,9 +39,9 @@ type Fcm struct {
cache cache.MsgModel
}
func NewClient(cache cache.MsgModel) *Fcm {
func NewClient(globalConfig *config.GlobalConfig, cache cache.MsgModel) *Fcm {
projectRoot := config.GetProjectRoot()
credentialsFilePath := filepath.Join(projectRoot, "config", config.Config.Push.Fcm.ServiceAccount)
credentialsFilePath := filepath.Join(projectRoot, "config", globalConfig.Push.Fcm.ServiceAccount)
opt := option.WithCredentialsFile(credentialsFilePath)
fcmApp, err := firebase.NewApp(context.Background(), nil, opt)
if err != nil {

@ -133,13 +133,13 @@ type Payload struct {
IsSignal bool `json:"isSignal"`
}
func newPushReq(title, content string) PushReq {
func newPushReq(config *config.GlobalConfig, title, content string) PushReq {
pushReq := PushReq{PushMessage: &PushMessage{Notification: &Notification{
Title: title,
Body: content,
ClickType: "startapp",
ChannelID: config.Config.Push.GeTui.ChannelID,
ChannelName: config.Config.Push.GeTui.ChannelName,
ChannelID: config.Push.GeTui.ChannelID,
ChannelName: config.Push.GeTui.ChannelName,
}}}
return pushReq
}

@ -59,10 +59,15 @@ type Client struct {
cache cache.MsgModel
tokenExpireTime int64
taskIDTTL int64
config *config.GlobalConfig
}
func NewClient(cache cache.MsgModel) *Client {
return &Client{cache: cache, tokenExpireTime: tokenExpireTime, taskIDTTL: taskIDTTL}
func NewClient(config *config.GlobalConfig, cache cache.MsgModel) *Client {
return &Client{cache: cache,
tokenExpireTime: tokenExpireTime,
taskIDTTL: taskIDTTL,
config: config,
}
}
func (g *Client) Push(ctx context.Context, userIDs []string, title, content string, opts *offlinepush.Opts) error {
@ -78,7 +83,7 @@ func (g *Client) Push(ctx context.Context, userIDs []string, title, content stri
return err
}
}
pushReq := newPushReq(title, content)
pushReq := newPushReq(g.config, title, content)
pushReq.setPushChannel(title, content)
if len(userIDs) > 1 {
maxNum := 999
@ -114,13 +119,13 @@ func (g *Client) Push(ctx context.Context, userIDs []string, title, content stri
func (g *Client) Auth(ctx context.Context, timeStamp int64) (token string, expireTime int64, err error) {
h := sha256.New()
h.Write(
[]byte(config.Config.Push.GeTui.AppKey + strconv.Itoa(int(timeStamp)) + config.Config.Push.GeTui.MasterSecret),
[]byte(g.config.Push.GeTui.AppKey + strconv.Itoa(int(timeStamp)) + g.config.Push.GeTui.MasterSecret),
)
sign := hex.EncodeToString(h.Sum(nil))
reqAuth := AuthReq{
Sign: sign,
Timestamp: strconv.Itoa(int(timeStamp)),
AppKey: config.Config.Push.GeTui.AppKey,
AppKey: g.config.Push.GeTui.AppKey,
}
respAuth := AuthResp{}
err = g.request(ctx, authURL, reqAuth, "", &respAuth)
@ -163,7 +168,7 @@ func (g *Client) request(ctx context.Context, url string, input any, token strin
header := map[string]string{"token": token}
resp := &Resp{}
resp.Data = output
return g.postReturn(ctx, config.Config.Push.GeTui.PushUrl+url, header, input, resp, 3)
return g.postReturn(ctx, g.config.Push.GeTui.PushUrl+url, header, input, resp, 3)
}
func (g *Client) postReturn(

@ -46,7 +46,6 @@ type Extras struct {
func (n *Notification) SetAlert(alert string) {
n.Alert = alert
n.Android.Alert = alert
n.SetAndroidIntent()
n.IOS.Alert = alert
n.IOS.Sound = "default"
n.IOS.Badge = "+1"
@ -57,8 +56,8 @@ func (n *Notification) SetExtras(extras Extras) {
n.Android.Extras = extras
}
func (n *Notification) SetAndroidIntent() {
n.Android.Intent.URL = config.Config.Push.Jpns.PushIntent
func (n *Notification) SetAndroidIntent(config *config.GlobalConfig) {
n.Android.Intent.URL = config.Push.Jpns.PushIntent
}
func (n *Notification) IOSEnableMutableContent() {

@ -25,10 +25,12 @@ import (
http2 "github.com/openimsdk/open-im-server/v3/pkg/common/http"
)
type JPush struct{}
type JPush struct {
config *config.GlobalConfig
}
func NewClient() *JPush {
return &JPush{}
func NewClient(config *config.GlobalConfig) *JPush {
return &JPush{config: config}
}
func (j *JPush) Auth(apiKey, secretKey string, timeStamp int64) (token string, err error) {
@ -59,10 +61,12 @@ func (j *JPush) Push(ctx context.Context, userIDs []string, title, content strin
no.IOSEnableMutableContent()
no.SetExtras(extras)
no.SetAlert(title)
no.SetAndroidIntent(j.config)
var msg body.Message
msg.SetMsgContent(content)
var opt body.Options
opt.SetApnsProduction(config.Config.IOSPush.Production)
opt.SetApnsProduction(j.config.IOSPush.Production)
var pushObj body.PushObj
pushObj.SetPlatform(&pf)
pushObj.SetAudience(&au)
@ -76,9 +80,9 @@ func (j *JPush) Push(ctx context.Context, userIDs []string, title, content strin
func (j *JPush) request(ctx context.Context, po body.PushObj, resp any, timeout int) error {
return http2.PostReturn(
ctx,
config.Config.Push.Jpns.PushUrl,
j.config.Push.Jpns.PushUrl,
map[string]string{
"Authorization": j.getAuthorization(config.Config.Push.Jpns.AppKey, config.Config.Push.Jpns.MasterSecret),
"Authorization": j.getAuthorization(j.config.Push.Jpns.AppKey, j.config.Push.Jpns.MasterSecret),
},
po,
resp,

@ -35,15 +35,15 @@ type ConsumerHandler struct {
pusher *Pusher
}
func NewConsumerHandler(pusher *Pusher) (*ConsumerHandler, error) {
func NewConsumerHandler(config *config.GlobalConfig, pusher *Pusher) (*ConsumerHandler, error) {
var consumerHandler ConsumerHandler
consumerHandler.pusher = pusher
var err error
consumerHandler.pushConsumerGroup, err = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false,
}, []string{config.Config.Kafka.MsgToPush.Topic}, config.Config.Kafka.Addr,
config.Config.Kafka.ConsumerGroupID.MsgToPush)
}, []string{config.Kafka.MsgToPush.Topic}, config.Kafka.Addr,
config.Kafka.ConsumerGroupID.MsgToPush)
if err != nil {
return nil, err
}

@ -39,17 +39,18 @@ type pushServer struct {
}
func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis()
rdb, err := cache.NewRedis(config)
if err != nil {
return err
}
cacheModel := cache.NewMsgCacheModel(rdb)
offlinePusher := NewOfflinePusher(cacheModel)
offlinePusher := NewOfflinePusher(config, cacheModel)
database := controller.NewPushDatabase(cacheModel)
groupRpcClient := rpcclient.NewGroupRpcClient(client)
conversationRpcClient := rpcclient.NewConversationRpcClient(client)
msgRpcClient := rpcclient.NewMessageRpcClient(client)
pusher := NewPusher(
config,
client,
offlinePusher,
database,
@ -65,7 +66,7 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg
config: config,
})
consumer, err := NewConsumer(pusher)
consumer, err := NewConsumer(config, pusher)
if err != nil {
return err
}

@ -48,6 +48,7 @@ import (
)
type Pusher struct {
config *config.GlobalConfig
database controller.PushDatabase
discov discoveryregistry.SvcDiscoveryRegistry
offlinePusher offlinepush.OfflinePusher
@ -60,11 +61,12 @@ type Pusher struct {
var errNoOfflinePusher = errors.New("no offlinePusher is configured")
func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase,
func NewPusher(config *config.GlobalConfig, discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase,
groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache,
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient,
) *Pusher {
return &Pusher{
config: config,
discov: discov,
database: database,
offlinePusher: offlinePusher,
@ -76,15 +78,15 @@ func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offl
}
}
func NewOfflinePusher(cache cache.MsgModel) offlinepush.OfflinePusher {
func NewOfflinePusher(config *config.GlobalConfig, cache cache.MsgModel) offlinepush.OfflinePusher {
var offlinePusher offlinepush.OfflinePusher
switch config.Config.Push.Enable {
switch config.Push.Enable {
case "getui":
offlinePusher = getui.NewClient(cache)
offlinePusher = getui.NewClient(config, cache)
case "fcm":
offlinePusher = fcm.NewClient(cache)
offlinePusher = fcm.NewClient(config, cache)
case "jpush":
offlinePusher = jpush.NewClient()
offlinePusher = jpush.NewClient(config)
default:
offlinePusher = dummy.NewClient()
}
@ -102,7 +104,7 @@ func (p *Pusher) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID
func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error {
log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String())
if err := callbackOnlinePush(ctx, userIDs, msg); err != nil {
if err := callbackOnlinePush(ctx, p.config, userIDs, msg); err != nil {
return err
}
// push
@ -130,7 +132,7 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg
})
if len(offlinePushUserIDList) > 0 {
if err = callbackOfflinePush(ctx, offlinePushUserIDList, msg, &[]string{}); err != nil {
if err = callbackOfflinePush(ctx, p.config, offlinePushUserIDList, msg, &[]string{}); err != nil {
return err
}
err = p.offlinePushMsg(ctx, msg.SendID, msg, offlinePushUserIDList)
@ -163,7 +165,7 @@ func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string,
}
if len(needOfflinePushUserIDs) > 0 {
var offlinePushUserIDs []string
err := callbackOfflinePush(ctx, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
err := callbackOfflinePush(ctx, p.config, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
if err != nil {
return err
}
@ -194,7 +196,7 @@ func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string,
func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
var pushToUserIDs []string
if err = callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil {
if err = callbackBeforeSuperGroupOnlinePush(ctx, p.config, groupID, msg, &pushToUserIDs); err != nil {
return err
}
@ -235,11 +237,11 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
return err
}
log.ZInfo(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(pushToUserIDs), "list", pushToUserIDs)
if len(config.Config.Manager.UserID) > 0 {
ctx = mcontext.WithOpUserIDContext(ctx, config.Config.Manager.UserID[0])
if len(p.config.Manager.UserID) > 0 {
ctx = mcontext.WithOpUserIDContext(ctx, p.config.Manager.UserID[0])
}
if len(config.Config.Manager.UserID) == 0 && len(config.Config.IMAdmin.UserID) > 0 {
ctx = mcontext.WithOpUserIDContext(ctx, config.Config.IMAdmin.UserID[0])
if len(p.config.Manager.UserID) == 0 && len(p.config.IMAdmin.UserID) > 0 {
ctx = mcontext.WithOpUserIDContext(ctx, p.config.IMAdmin.UserID[0])
}
defer func(groupID string) {
if err = p.groupRpcClient.DismissGroup(ctx, groupID); err != nil {
@ -257,10 +259,10 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg)
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
if isOfflinePush && config.Config.Envs.Discovery == "k8s" {
if isOfflinePush && p.config.Envs.Discovery == "k8s" {
return p.k8sOfflinePush2SuperGroup(ctx, groupID, msg, wsResults)
}
if isOfflinePush && config.Config.Envs.Discovery == "zookeeper" {
if isOfflinePush && p.config.Envs.Discovery == "zookeeper" {
var (
onlineSuccessUserIDs = []string{msg.SendID}
webAndPcBackgroundUserIDs []string
@ -298,7 +300,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
// Use offline push messaging
if len(needOfflinePushUserIDs) > 0 {
var offlinePushUserIDs []string
err = callbackOfflinePush(ctx, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
err = callbackOfflinePush(ctx, p.config, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
if err != nil {
return err
}
@ -357,7 +359,7 @@ func (p *Pusher) k8sOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUs
var (
mu sync.Mutex
wg = errgroup.Group{}
maxWorkers = config.Config.Push.MaxConcurrentWorkers
maxWorkers = p.config.Push.MaxConcurrentWorkers
)
if maxWorkers < 3 {
maxWorkers = 3
@ -386,10 +388,10 @@ func (p *Pusher) k8sOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUs
return wsResults, nil
}
func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
if config.Config.Envs.Discovery == "k8s" {
if p.config.Envs.Discovery == "k8s" {
return p.k8sOnlinePush(ctx, msg, pushToUserIDs)
}
conns, err := p.discov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName)
conns, err := p.discov.GetConns(ctx, p.config.RpcRegisterName.OpenImMessageGatewayName)
log.ZDebug(ctx, "get gateway conn", "conn length", len(conns))
if err != nil {
return nil, err
@ -399,7 +401,7 @@ func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
mu sync.Mutex
wg = errgroup.Group{}
input = &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs}
maxWorkers = config.Config.Push.MaxConcurrentWorkers
maxWorkers = p.config.Push.MaxConcurrentWorkers
)
if maxWorkers < 3 {

@ -40,7 +40,7 @@ const (
)
// NewRedis Initialize redis connection.
func NewRedis() (redis.UniversalClient, error) {
func NewRedis(config *config.GlobalConfig) (redis.UniversalClient, error) {
if redisClient != nil {
return redisClient, nil
}
@ -48,24 +48,24 @@ func NewRedis() (redis.UniversalClient, error) {
// Read configuration from environment variables
overrideConfigFromEnv()
if len(config.Config.Redis.Address) == 0 {
if len(config.Redis.Address) == 0 {
return nil, errs.Wrap(errors.New("redis address is empty"))
}
specialerror.AddReplace(redis.Nil, errs.ErrRecordNotFound)
var rdb redis.UniversalClient
if len(config.Config.Redis.Address) > 1 || config.Config.Redis.ClusterMode {
if len(config.Redis.Address) > 1 || config.Redis.ClusterMode {
rdb = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: config.Config.Redis.Address,
Username: config.Config.Redis.Username,
Password: config.Config.Redis.Password, // no password set
Addrs: config.Redis.Address,
Username: config.Redis.Username,
Password: config.Redis.Password, // no password set
PoolSize: 50,
MaxRetries: maxRetry,
})
} else {
rdb = redis.NewClient(&redis.Options{
Addr: config.Config.Redis.Address[0],
Username: config.Config.Redis.Username,
Password: config.Config.Redis.Password,
Addr: config.Redis.Address[0],
Username: config.Redis.Username,
Password: config.Redis.Password,
DB: 0, // use default DB
PoolSize: 100, // connection pool size
MaxRetries: maxRetry,
@ -78,7 +78,8 @@ func NewRedis() (redis.UniversalClient, error) {
err = rdb.Ping(ctx).Err()
if err != nil {
uriFormat := "address:%s, username:%s, password:%s, clusterMode:%t, enablePipeline:%t"
errMsg := fmt.Sprintf(uriFormat, config.Config.Redis.Address, config.Config.Redis.Username, config.Config.Redis.Password, config.Config.Redis.ClusterMode, config.Config.Redis.EnablePipeline)
errMsg := fmt.Sprintf(uriFormat, config.Redis.Address, config.Redis.Username,
config.Redis.Password, config.Redis.ClusterMode, config.Redis.EnablePipeline)
return nil, errs.Wrap(err, errMsg)
}
redisClient = rdb

Loading…
Cancel
Save