diff --git a/internal/api/route.go b/internal/api/route.go index 0a7a9ed86..5cb94507f 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -246,8 +246,8 @@ func newGinRouter(disCov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive // Third service thirdGroup := r.Group("/third", ParseToken) { - thirdGroup.GET("/prometheus", GetPrometheus) t := NewThirdApi(*thirdRpc) + thirdGroup.GET("/prometheus", t.GetPrometheus) thirdGroup.POST("/fcm_update_token", t.FcmUpdateToken) thirdGroup.POST("/set_app_badge", t.SetAppBadge) diff --git a/internal/api/third.go b/internal/api/third.go index 0a1ef0fbe..984faaa20 100644 --- a/internal/api/third.go +++ b/internal/api/third.go @@ -19,8 +19,6 @@ import ( "net/http" "strconv" - config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/gin-gonic/gin" "github.com/OpenIMSDK/protocol/third" @@ -129,6 +127,6 @@ func (o *ThirdApi) SearchLogs(c *gin.Context) { a2r.Call(third.ThirdClient.SearchLogs, o.Client, c) } -func GetPrometheus(c *gin.Context) { - c.Redirect(http.StatusFound, config2.Config.Prometheus.GrafanaUrl) +func (o *ThirdApi) GetPrometheus(c *gin.Context) { + c.Redirect(http.StatusFound, o.Config.Prometheus.GrafanaUrl) } diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index a9f96a8e9..29b510110 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -49,6 +49,7 @@ type MsgTransfer struct { historyMongoCH *OnlineHistoryMongoConsumerHandler // mongoDB批量插入, 成功后删除redis中消息,以及处理删除通知消息删除的 订阅的topic: msg_to_mongo ctx context.Context cancel context.CancelFunc + config *config.GlobalConfig } func StartTransfer(config *config.GlobalConfig, prometheusPort int) error { @@ -102,6 +103,7 @@ func NewMsgTransfer(config *config.GlobalConfig, msgDatabase controller.CommonMs return &MsgTransfer{ historyCH: historyCH, historyMongoCH: historyMongoCH, + config: config, }, nil } @@ -126,7 +128,7 @@ func (m *MsgTransfer) Start(prometheusPort int, config *config.GlobalConfig) err proreg.MustRegister( collectors.NewGoCollector(), ) - proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer")...) + proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer", config)...) http.Handle("/metrics", promhttp.HandlerFor(proreg, promhttp.HandlerOpts{Registry: proreg})) err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil) if err != nil && err != http.ErrServerClosed { diff --git a/internal/tools/cron_task_test.go b/internal/tools/cron_task_test.go index d88020e9a..fcae5a5f6 100644 --- a/internal/tools/cron_task_test.go +++ b/internal/tools/cron_task_test.go @@ -15,8 +15,12 @@ package tools import ( + "flag" "fmt" + "github.com/OpenIMSDK/tools/errs" + "gopkg.in/yaml.v3" "math/rand" + "os" "sync" "testing" "time" @@ -71,7 +75,11 @@ func TestCronWrapFunc(t *testing.T) { } func TestCronWrapFuncWithNetlock(t *testing.T) { - config.Config.EnableCronLocker = true + conf, err := initCfg() + if err != nil { + panic(err) + } + conf.EnableCronLocker = true rdb := redis.NewClient(&redis.Options{}) defer rdb.Close() @@ -80,10 +88,10 @@ func TestCronWrapFuncWithNetlock(t *testing.T) { crontab := cron.New(cron.WithSeconds()) key := fmt.Sprintf("cron-%v", rand.Int31()) - crontab.AddFunc("*/1 * * * * *", cronWrapFunc(config.NewGlobalConfig(), rdb, key, func() { + crontab.AddFunc("*/1 * * * * *", cronWrapFunc(conf, rdb, key, func() { done <- "host1" })) - crontab.AddFunc("*/1 * * * * *", cronWrapFunc(config.NewGlobalConfig(), rdb, key, func() { + crontab.AddFunc("*/1 * * * * *", cronWrapFunc(conf, rdb, key, func() { done <- "host2" })) crontab.Start() @@ -94,3 +102,22 @@ func TestCronWrapFuncWithNetlock(t *testing.T) { crontab.Stop() } + +func initCfg() (*config.GlobalConfig, error) { + const ( + defaultCfgPath = "../../../../../config/config.yaml" + ) + + cfgPath := flag.String("c", defaultCfgPath, "Path to the configuration file") + data, err := os.ReadFile(*cfgPath) + if err != nil { + return nil, errs.Wrap(err, "ReadFile unmarshal failed") + } + + conf := config.NewGlobalConfig() + err = yaml.Unmarshal(data, &conf) + if err != nil { + return nil, errs.Wrap(err, "InitConfig unmarshal failed") + } + return conf, nil +} diff --git a/pkg/common/db/s3/aws/aws.go b/pkg/common/db/s3/aws/aws.go index d996485c7..76cef8ef1 100644 --- a/pkg/common/db/s3/aws/aws.go +++ b/pkg/common/db/s3/aws/aws.go @@ -54,11 +54,15 @@ const ( // ) func NewAWS() (s3.Interface, error) { - conf := config.Config.Object.Aws + configGlobal := config.NewGlobalConfig() + + config.InitConfig(configGlobal, "../../config") + + conf := configGlobal.Object.Aws credential := credentials.NewStaticCredentials( conf.AccessKeyID, // accessKey conf.AccessKeySecret, // secretKey - "") // sts的临时凭证 + "") // sts的临时凭证 sess, err := session.NewSession(&aws.Config{ Region: aws.String(conf.Region), // 桶所在的区域 diff --git a/pkg/common/discoveryregister/direct/directconn.go b/pkg/common/discoveryregister/direct/directconn.go index 84f173ea6..3493a86bf 100644 --- a/pkg/common/discoveryregister/direct/directconn.go +++ b/pkg/common/discoveryregister/direct/directconn.go @@ -28,17 +28,17 @@ import ( type ServiceAddresses map[string][]int -func getServiceAddresses() ServiceAddresses { +func getServiceAddresses(config *config2.GlobalConfig) ServiceAddresses { return ServiceAddresses{ - config2.Config.RpcRegisterName.OpenImUserName: config2.Config.RpcPort.OpenImUserPort, - config2.Config.RpcRegisterName.OpenImFriendName: config2.Config.RpcPort.OpenImFriendPort, - config2.Config.RpcRegisterName.OpenImMsgName: config2.Config.RpcPort.OpenImMessagePort, - config2.Config.RpcRegisterName.OpenImMessageGatewayName: config2.Config.LongConnSvr.OpenImMessageGatewayPort, - config2.Config.RpcRegisterName.OpenImGroupName: config2.Config.RpcPort.OpenImGroupPort, - config2.Config.RpcRegisterName.OpenImAuthName: config2.Config.RpcPort.OpenImAuthPort, - config2.Config.RpcRegisterName.OpenImPushName: config2.Config.RpcPort.OpenImPushPort, - config2.Config.RpcRegisterName.OpenImConversationName: config2.Config.RpcPort.OpenImConversationPort, - config2.Config.RpcRegisterName.OpenImThirdName: config2.Config.RpcPort.OpenImThirdPort, + config.RpcRegisterName.OpenImUserName: config.RpcPort.OpenImUserPort, + config.RpcRegisterName.OpenImFriendName: config.RpcPort.OpenImFriendPort, + config.RpcRegisterName.OpenImMsgName: config.RpcPort.OpenImMessagePort, + config.RpcRegisterName.OpenImMessageGatewayName: config.LongConnSvr.OpenImMessageGatewayPort, + config.RpcRegisterName.OpenImGroupName: config.RpcPort.OpenImGroupPort, + config.RpcRegisterName.OpenImAuthName: config.RpcPort.OpenImAuthPort, + config.RpcRegisterName.OpenImPushName: config.RpcPort.OpenImPushPort, + config.RpcRegisterName.OpenImConversationName: config.RpcPort.OpenImConversationPort, + config.RpcRegisterName.OpenImThirdName: config.RpcPort.OpenImThirdPort, } } @@ -47,6 +47,7 @@ type ConnDirect struct { currentServiceAddress string conns map[string][]*grpc.ClientConn resolverDirect *ResolverDirect + config *config2.GlobalConfig } func (cd *ConnDirect) GetClientLocalConns() map[string][]*grpc.ClientConn { @@ -81,10 +82,11 @@ func (cd *ConnDirect) Close() { } -func NewConnDirect() (*ConnDirect, error) { +func NewConnDirect(config *config2.GlobalConfig) (*ConnDirect, error) { return &ConnDirect{ conns: make(map[string][]*grpc.ClientConn), resolverDirect: NewResolverDirect(), + config: config, }, nil } @@ -94,12 +96,12 @@ func (cd *ConnDirect) GetConns(ctx context.Context, if conns, exists := cd.conns[serviceName]; exists { return conns, nil } - ports := getServiceAddresses()[serviceName] + ports := getServiceAddresses(cd.config)[serviceName] var connections []*grpc.ClientConn for _, port := range ports { - conn, err := cd.dialServiceWithoutResolver(ctx, fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", port), append(cd.additionalOpts, opts...)...) + conn, err := cd.dialServiceWithoutResolver(ctx, fmt.Sprintf(cd.config.Rpc.ListenIP+":%d", port), append(cd.additionalOpts, opts...)...) if err != nil { - fmt.Printf("connect to port %d failed,serviceName %s, IP %s\n", port, serviceName, config2.Config.Rpc.ListenIP) + fmt.Printf("connect to port %d failed,serviceName %s, IP %s\n", port, serviceName, cd.config.Rpc.ListenIP) } connections = append(connections, conn) } @@ -112,7 +114,7 @@ func (cd *ConnDirect) GetConns(ctx context.Context, func (cd *ConnDirect) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { // Get service addresses - addresses := getServiceAddresses() + addresses := getServiceAddresses(cd.config) address, ok := addresses[serviceName] if !ok { return nil, errs.Wrap(errors.New("unknown service name"), "serviceName", serviceName) @@ -120,9 +122,9 @@ func (cd *ConnDirect) GetConn(ctx context.Context, serviceName string, opts ...g var result string for _, addr := range address { if result != "" { - result = result + "," + fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", addr) + result = result + "," + fmt.Sprintf(cd.config.Rpc.ListenIP+":%d", addr) } else { - result = fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", addr) + result = fmt.Sprintf(cd.config.Rpc.ListenIP+":%d", addr) } } // Try to dial a new connection diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 36785c4b0..c286d6b4a 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -40,7 +40,7 @@ func NewDiscoveryRegister(config *config.GlobalConfig) (discoveryregistry.SvcDis case "k8s": return kubernetes.NewK8sDiscoveryRegister(config.RpcRegisterName.OpenImMessageGatewayName) case "direct": - return direct.NewConnDirect() + return direct.NewConnDirect(config) default: return nil, errors.New("envType not correct") } diff --git a/pkg/common/prommetrics/prommetrics.go b/pkg/common/prommetrics/prommetrics.go index b7c5e07f4..a4515c74f 100644 --- a/pkg/common/prommetrics/prommetrics.go +++ b/pkg/common/prommetrics/prommetrics.go @@ -33,17 +33,17 @@ func NewGrpcPromObj(cusMetrics []prometheus.Collector) (*prometheus.Registry, *g return reg, grpcMetrics, nil } -func GetGrpcCusMetrics(registerName string) []prometheus.Collector { +func GetGrpcCusMetrics(registerName string, config *config2.GlobalConfig) []prometheus.Collector { switch registerName { - case config2.Config.RpcRegisterName.OpenImMessageGatewayName: + case config.RpcRegisterName.OpenImMessageGatewayName: return []prometheus.Collector{OnlineUserGauge} - case config2.Config.RpcRegisterName.OpenImMsgName: + case config.RpcRegisterName.OpenImMsgName: return []prometheus.Collector{SingleChatMsgProcessSuccessCounter, SingleChatMsgProcessFailedCounter, GroupChatMsgProcessSuccessCounter, GroupChatMsgProcessFailedCounter} case "Transfer": return []prometheus.Collector{MsgInsertRedisSuccessCounter, MsgInsertRedisFailedCounter, MsgInsertMongoSuccessCounter, MsgInsertMongoFailedCounter, SeqSetFailedCounter} - case config2.Config.RpcRegisterName.OpenImPushName: + case config.RpcRegisterName.OpenImPushName: return []prometheus.Collector{MsgOfflinePushFailedCounter} - case config2.Config.RpcRegisterName.OpenImAuthName: + case config.RpcRegisterName.OpenImAuthName: return []prometheus.Collector{UserLoginCounter} default: return nil diff --git a/pkg/common/prommetrics/prommetrics_test.go b/pkg/common/prommetrics/prommetrics_test.go index 1e48c63ba..eb6f3c771 100644 --- a/pkg/common/prommetrics/prommetrics_test.go +++ b/pkg/common/prommetrics/prommetrics_test.go @@ -58,17 +58,20 @@ func TestNewGrpcPromObj(t *testing.T) { } func TestGetGrpcCusMetrics(t *testing.T) { + conf := config2.NewGlobalConfig() + + config2.InitConfig(conf, "../../config") // Test various cases based on the switch statement in the GetGrpcCusMetrics function. testCases := []struct { name string expected int // The expected number of metrics for each case. }{ - {config2.Config.RpcRegisterName.OpenImMessageGatewayName, 1}, + {conf.RpcRegisterName.OpenImMessageGatewayName, 1}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - metrics := GetGrpcCusMetrics(tc.name) + metrics := GetGrpcCusMetrics(tc.name, conf) assert.Len(t, metrics, tc.expected) }) } diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 105873c98..5109adcab 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -85,7 +85,7 @@ func Start( var reg *prometheus.Registry var metric *grpcprometheus.ServerMetrics if config.Prometheus.Enable { - cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName) + cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, config) reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics) options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), grpc.UnaryInterceptor(metric.UnaryServerInterceptor())) diff --git a/tools/component/component_test.go b/tools/component/component_test.go index 7fd46e9dd..c56361b2c 100644 --- a/tools/component/component_test.go +++ b/tools/component/component_test.go @@ -21,18 +21,8 @@ import ( "time" "github.com/redis/go-redis/v9" - - "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) -// Mock for initCfg for testing purpose -func mockInitCfg() error { - config.Config.Mysql.Username = "root" - config.Config.Mysql.Password = "openIM123" - config.Config.Mysql.Address = []string{"127.0.0.1:13306"} - return nil -} - func TestRedis(t *testing.T) { conf, err := initCfg() conf.Redis.Address = []string{ diff --git a/tools/up35/pkg/pkg.go b/tools/up35/pkg/pkg.go index 4247d694c..1348172d2 100644 --- a/tools/up35/pkg/pkg.go +++ b/tools/up35/pkg/pkg.go @@ -79,7 +79,7 @@ func Main(path string) error { if err != nil { return err } - if config.Config.Mysql == nil { + if conf.Mysql == nil { return nil } mongoDB, err := GetMongo(conf) @@ -121,7 +121,7 @@ func Main(path string) error { func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupMember, c.GroupMember) }, func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupRequestMgo, c.GroupRequest) }, func() error { return NewTask(mysqlDB, mongoDB, mgo.NewConversationMongo, c.Conversation) }, - func() error { return NewTask(mysqlDB, mongoDB, mgo.NewS3Mongo, c.Object(config.Config.Object.Enable)) }, + func() error { return NewTask(mysqlDB, mongoDB, mgo.NewS3Mongo, c.Object(conf.Object.Enable)) }, func() error { return NewTask(mysqlDB, mongoDB, mgo.NewLogMongo, c.Log) }, func() error { return NewTask(mysqlDB, mongoDB, rtcmgo.NewSignal, c.SignalModel) },