fix: fix the config

pull/1987/head
luhaoling 2 years ago
parent 91755db8cd
commit 62134aa6fc

@ -246,8 +246,8 @@ func newGinRouter(disCov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
// Third service // Third service
thirdGroup := r.Group("/third", ParseToken) thirdGroup := r.Group("/third", ParseToken)
{ {
thirdGroup.GET("/prometheus", GetPrometheus)
t := NewThirdApi(*thirdRpc) t := NewThirdApi(*thirdRpc)
thirdGroup.GET("/prometheus", t.GetPrometheus)
thirdGroup.POST("/fcm_update_token", t.FcmUpdateToken) thirdGroup.POST("/fcm_update_token", t.FcmUpdateToken)
thirdGroup.POST("/set_app_badge", t.SetAppBadge) thirdGroup.POST("/set_app_badge", t.SetAppBadge)

@ -19,8 +19,6 @@ import (
"net/http" "net/http"
"strconv" "strconv"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/OpenIMSDK/protocol/third" "github.com/OpenIMSDK/protocol/third"
@ -129,6 +127,6 @@ func (o *ThirdApi) SearchLogs(c *gin.Context) {
a2r.Call(third.ThirdClient.SearchLogs, o.Client, c) a2r.Call(third.ThirdClient.SearchLogs, o.Client, c)
} }
func GetPrometheus(c *gin.Context) { func (o *ThirdApi) GetPrometheus(c *gin.Context) {
c.Redirect(http.StatusFound, config2.Config.Prometheus.GrafanaUrl) c.Redirect(http.StatusFound, o.Config.Prometheus.GrafanaUrl)
} }

@ -49,6 +49,7 @@ type MsgTransfer struct {
historyMongoCH *OnlineHistoryMongoConsumerHandler // mongoDB批量插入, 成功后删除redis中消息以及处理删除通知消息删除的 订阅的topic: msg_to_mongo historyMongoCH *OnlineHistoryMongoConsumerHandler // mongoDB批量插入, 成功后删除redis中消息以及处理删除通知消息删除的 订阅的topic: msg_to_mongo
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
config *config.GlobalConfig
} }
func StartTransfer(config *config.GlobalConfig, prometheusPort int) error { func StartTransfer(config *config.GlobalConfig, prometheusPort int) error {
@ -102,6 +103,7 @@ func NewMsgTransfer(config *config.GlobalConfig, msgDatabase controller.CommonMs
return &MsgTransfer{ return &MsgTransfer{
historyCH: historyCH, historyCH: historyCH,
historyMongoCH: historyMongoCH, historyMongoCH: historyMongoCH,
config: config,
}, nil }, nil
} }
@ -126,7 +128,7 @@ func (m *MsgTransfer) Start(prometheusPort int, config *config.GlobalConfig) err
proreg.MustRegister( proreg.MustRegister(
collectors.NewGoCollector(), collectors.NewGoCollector(),
) )
proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer")...) proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer", config)...)
http.Handle("/metrics", promhttp.HandlerFor(proreg, promhttp.HandlerOpts{Registry: proreg})) http.Handle("/metrics", promhttp.HandlerFor(proreg, promhttp.HandlerOpts{Registry: proreg}))
err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil) err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil)
if err != nil && err != http.ErrServerClosed { if err != nil && err != http.ErrServerClosed {

@ -15,8 +15,12 @@
package tools package tools
import ( import (
"flag"
"fmt" "fmt"
"github.com/OpenIMSDK/tools/errs"
"gopkg.in/yaml.v3"
"math/rand" "math/rand"
"os"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -71,7 +75,11 @@ func TestCronWrapFunc(t *testing.T) {
} }
func TestCronWrapFuncWithNetlock(t *testing.T) { func TestCronWrapFuncWithNetlock(t *testing.T) {
config.Config.EnableCronLocker = true conf, err := initCfg()
if err != nil {
panic(err)
}
conf.EnableCronLocker = true
rdb := redis.NewClient(&redis.Options{}) rdb := redis.NewClient(&redis.Options{})
defer rdb.Close() defer rdb.Close()
@ -80,10 +88,10 @@ func TestCronWrapFuncWithNetlock(t *testing.T) {
crontab := cron.New(cron.WithSeconds()) crontab := cron.New(cron.WithSeconds())
key := fmt.Sprintf("cron-%v", rand.Int31()) key := fmt.Sprintf("cron-%v", rand.Int31())
crontab.AddFunc("*/1 * * * * *", cronWrapFunc(config.NewGlobalConfig(), rdb, key, func() { crontab.AddFunc("*/1 * * * * *", cronWrapFunc(conf, rdb, key, func() {
done <- "host1" done <- "host1"
})) }))
crontab.AddFunc("*/1 * * * * *", cronWrapFunc(config.NewGlobalConfig(), rdb, key, func() { crontab.AddFunc("*/1 * * * * *", cronWrapFunc(conf, rdb, key, func() {
done <- "host2" done <- "host2"
})) }))
crontab.Start() crontab.Start()
@ -94,3 +102,22 @@ func TestCronWrapFuncWithNetlock(t *testing.T) {
crontab.Stop() crontab.Stop()
} }
func initCfg() (*config.GlobalConfig, error) {
const (
defaultCfgPath = "../../../../../config/config.yaml"
)
cfgPath := flag.String("c", defaultCfgPath, "Path to the configuration file")
data, err := os.ReadFile(*cfgPath)
if err != nil {
return nil, errs.Wrap(err, "ReadFile unmarshal failed")
}
conf := config.NewGlobalConfig()
err = yaml.Unmarshal(data, &conf)
if err != nil {
return nil, errs.Wrap(err, "InitConfig unmarshal failed")
}
return conf, nil
}

@ -54,7 +54,11 @@ const (
// ) // )
func NewAWS() (s3.Interface, error) { func NewAWS() (s3.Interface, error) {
conf := config.Config.Object.Aws configGlobal := config.NewGlobalConfig()
config.InitConfig(configGlobal, "../../config")
conf := configGlobal.Object.Aws
credential := credentials.NewStaticCredentials( credential := credentials.NewStaticCredentials(
conf.AccessKeyID, // accessKey conf.AccessKeyID, // accessKey
conf.AccessKeySecret, // secretKey conf.AccessKeySecret, // secretKey

@ -28,17 +28,17 @@ import (
type ServiceAddresses map[string][]int type ServiceAddresses map[string][]int
func getServiceAddresses() ServiceAddresses { func getServiceAddresses(config *config2.GlobalConfig) ServiceAddresses {
return ServiceAddresses{ return ServiceAddresses{
config2.Config.RpcRegisterName.OpenImUserName: config2.Config.RpcPort.OpenImUserPort, config.RpcRegisterName.OpenImUserName: config.RpcPort.OpenImUserPort,
config2.Config.RpcRegisterName.OpenImFriendName: config2.Config.RpcPort.OpenImFriendPort, config.RpcRegisterName.OpenImFriendName: config.RpcPort.OpenImFriendPort,
config2.Config.RpcRegisterName.OpenImMsgName: config2.Config.RpcPort.OpenImMessagePort, config.RpcRegisterName.OpenImMsgName: config.RpcPort.OpenImMessagePort,
config2.Config.RpcRegisterName.OpenImMessageGatewayName: config2.Config.LongConnSvr.OpenImMessageGatewayPort, config.RpcRegisterName.OpenImMessageGatewayName: config.LongConnSvr.OpenImMessageGatewayPort,
config2.Config.RpcRegisterName.OpenImGroupName: config2.Config.RpcPort.OpenImGroupPort, config.RpcRegisterName.OpenImGroupName: config.RpcPort.OpenImGroupPort,
config2.Config.RpcRegisterName.OpenImAuthName: config2.Config.RpcPort.OpenImAuthPort, config.RpcRegisterName.OpenImAuthName: config.RpcPort.OpenImAuthPort,
config2.Config.RpcRegisterName.OpenImPushName: config2.Config.RpcPort.OpenImPushPort, config.RpcRegisterName.OpenImPushName: config.RpcPort.OpenImPushPort,
config2.Config.RpcRegisterName.OpenImConversationName: config2.Config.RpcPort.OpenImConversationPort, config.RpcRegisterName.OpenImConversationName: config.RpcPort.OpenImConversationPort,
config2.Config.RpcRegisterName.OpenImThirdName: config2.Config.RpcPort.OpenImThirdPort, config.RpcRegisterName.OpenImThirdName: config.RpcPort.OpenImThirdPort,
} }
} }
@ -47,6 +47,7 @@ type ConnDirect struct {
currentServiceAddress string currentServiceAddress string
conns map[string][]*grpc.ClientConn conns map[string][]*grpc.ClientConn
resolverDirect *ResolverDirect resolverDirect *ResolverDirect
config *config2.GlobalConfig
} }
func (cd *ConnDirect) GetClientLocalConns() map[string][]*grpc.ClientConn { func (cd *ConnDirect) GetClientLocalConns() map[string][]*grpc.ClientConn {
@ -81,10 +82,11 @@ func (cd *ConnDirect) Close() {
} }
func NewConnDirect() (*ConnDirect, error) { func NewConnDirect(config *config2.GlobalConfig) (*ConnDirect, error) {
return &ConnDirect{ return &ConnDirect{
conns: make(map[string][]*grpc.ClientConn), conns: make(map[string][]*grpc.ClientConn),
resolverDirect: NewResolverDirect(), resolverDirect: NewResolverDirect(),
config: config,
}, nil }, nil
} }
@ -94,12 +96,12 @@ func (cd *ConnDirect) GetConns(ctx context.Context,
if conns, exists := cd.conns[serviceName]; exists { if conns, exists := cd.conns[serviceName]; exists {
return conns, nil return conns, nil
} }
ports := getServiceAddresses()[serviceName] ports := getServiceAddresses(cd.config)[serviceName]
var connections []*grpc.ClientConn var connections []*grpc.ClientConn
for _, port := range ports { for _, port := range ports {
conn, err := cd.dialServiceWithoutResolver(ctx, fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", port), append(cd.additionalOpts, opts...)...) conn, err := cd.dialServiceWithoutResolver(ctx, fmt.Sprintf(cd.config.Rpc.ListenIP+":%d", port), append(cd.additionalOpts, opts...)...)
if err != nil { if err != nil {
fmt.Printf("connect to port %d failed,serviceName %s, IP %s\n", port, serviceName, config2.Config.Rpc.ListenIP) fmt.Printf("connect to port %d failed,serviceName %s, IP %s\n", port, serviceName, cd.config.Rpc.ListenIP)
} }
connections = append(connections, conn) connections = append(connections, conn)
} }
@ -112,7 +114,7 @@ func (cd *ConnDirect) GetConns(ctx context.Context,
func (cd *ConnDirect) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { func (cd *ConnDirect) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
// Get service addresses // Get service addresses
addresses := getServiceAddresses() addresses := getServiceAddresses(cd.config)
address, ok := addresses[serviceName] address, ok := addresses[serviceName]
if !ok { if !ok {
return nil, errs.Wrap(errors.New("unknown service name"), "serviceName", serviceName) return nil, errs.Wrap(errors.New("unknown service name"), "serviceName", serviceName)
@ -120,9 +122,9 @@ func (cd *ConnDirect) GetConn(ctx context.Context, serviceName string, opts ...g
var result string var result string
for _, addr := range address { for _, addr := range address {
if result != "" { if result != "" {
result = result + "," + fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", addr) result = result + "," + fmt.Sprintf(cd.config.Rpc.ListenIP+":%d", addr)
} else { } else {
result = fmt.Sprintf(config2.Config.Rpc.ListenIP+":%d", addr) result = fmt.Sprintf(cd.config.Rpc.ListenIP+":%d", addr)
} }
} }
// Try to dial a new connection // Try to dial a new connection

@ -40,7 +40,7 @@ func NewDiscoveryRegister(config *config.GlobalConfig) (discoveryregistry.SvcDis
case "k8s": case "k8s":
return kubernetes.NewK8sDiscoveryRegister(config.RpcRegisterName.OpenImMessageGatewayName) return kubernetes.NewK8sDiscoveryRegister(config.RpcRegisterName.OpenImMessageGatewayName)
case "direct": case "direct":
return direct.NewConnDirect() return direct.NewConnDirect(config)
default: default:
return nil, errors.New("envType not correct") return nil, errors.New("envType not correct")
} }

@ -33,17 +33,17 @@ func NewGrpcPromObj(cusMetrics []prometheus.Collector) (*prometheus.Registry, *g
return reg, grpcMetrics, nil return reg, grpcMetrics, nil
} }
func GetGrpcCusMetrics(registerName string) []prometheus.Collector { func GetGrpcCusMetrics(registerName string, config *config2.GlobalConfig) []prometheus.Collector {
switch registerName { switch registerName {
case config2.Config.RpcRegisterName.OpenImMessageGatewayName: case config.RpcRegisterName.OpenImMessageGatewayName:
return []prometheus.Collector{OnlineUserGauge} return []prometheus.Collector{OnlineUserGauge}
case config2.Config.RpcRegisterName.OpenImMsgName: case config.RpcRegisterName.OpenImMsgName:
return []prometheus.Collector{SingleChatMsgProcessSuccessCounter, SingleChatMsgProcessFailedCounter, GroupChatMsgProcessSuccessCounter, GroupChatMsgProcessFailedCounter} return []prometheus.Collector{SingleChatMsgProcessSuccessCounter, SingleChatMsgProcessFailedCounter, GroupChatMsgProcessSuccessCounter, GroupChatMsgProcessFailedCounter}
case "Transfer": case "Transfer":
return []prometheus.Collector{MsgInsertRedisSuccessCounter, MsgInsertRedisFailedCounter, MsgInsertMongoSuccessCounter, MsgInsertMongoFailedCounter, SeqSetFailedCounter} return []prometheus.Collector{MsgInsertRedisSuccessCounter, MsgInsertRedisFailedCounter, MsgInsertMongoSuccessCounter, MsgInsertMongoFailedCounter, SeqSetFailedCounter}
case config2.Config.RpcRegisterName.OpenImPushName: case config.RpcRegisterName.OpenImPushName:
return []prometheus.Collector{MsgOfflinePushFailedCounter} return []prometheus.Collector{MsgOfflinePushFailedCounter}
case config2.Config.RpcRegisterName.OpenImAuthName: case config.RpcRegisterName.OpenImAuthName:
return []prometheus.Collector{UserLoginCounter} return []prometheus.Collector{UserLoginCounter}
default: default:
return nil return nil

@ -58,17 +58,20 @@ func TestNewGrpcPromObj(t *testing.T) {
} }
func TestGetGrpcCusMetrics(t *testing.T) { func TestGetGrpcCusMetrics(t *testing.T) {
conf := config2.NewGlobalConfig()
config2.InitConfig(conf, "../../config")
// Test various cases based on the switch statement in the GetGrpcCusMetrics function. // Test various cases based on the switch statement in the GetGrpcCusMetrics function.
testCases := []struct { testCases := []struct {
name string name string
expected int // The expected number of metrics for each case. expected int // The expected number of metrics for each case.
}{ }{
{config2.Config.RpcRegisterName.OpenImMessageGatewayName, 1}, {conf.RpcRegisterName.OpenImMessageGatewayName, 1},
} }
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
metrics := GetGrpcCusMetrics(tc.name) metrics := GetGrpcCusMetrics(tc.name, conf)
assert.Len(t, metrics, tc.expected) assert.Len(t, metrics, tc.expected)
}) })
} }

@ -85,7 +85,7 @@ func Start(
var reg *prometheus.Registry var reg *prometheus.Registry
var metric *grpcprometheus.ServerMetrics var metric *grpcprometheus.ServerMetrics
if config.Prometheus.Enable { if config.Prometheus.Enable {
cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName) cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, config)
reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics) reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics)
options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()),
grpc.UnaryInterceptor(metric.UnaryServerInterceptor())) grpc.UnaryInterceptor(metric.UnaryServerInterceptor()))

@ -21,18 +21,8 @@ import (
"time" "time"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
) )
// Mock for initCfg for testing purpose
func mockInitCfg() error {
config.Config.Mysql.Username = "root"
config.Config.Mysql.Password = "openIM123"
config.Config.Mysql.Address = []string{"127.0.0.1:13306"}
return nil
}
func TestRedis(t *testing.T) { func TestRedis(t *testing.T) {
conf, err := initCfg() conf, err := initCfg()
conf.Redis.Address = []string{ conf.Redis.Address = []string{

@ -79,7 +79,7 @@ func Main(path string) error {
if err != nil { if err != nil {
return err return err
} }
if config.Config.Mysql == nil { if conf.Mysql == nil {
return nil return nil
} }
mongoDB, err := GetMongo(conf) mongoDB, err := GetMongo(conf)
@ -121,7 +121,7 @@ func Main(path string) error {
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupMember, c.GroupMember) }, func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupMember, c.GroupMember) },
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupRequestMgo, c.GroupRequest) }, func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupRequestMgo, c.GroupRequest) },
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewConversationMongo, c.Conversation) }, func() error { return NewTask(mysqlDB, mongoDB, mgo.NewConversationMongo, c.Conversation) },
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewS3Mongo, c.Object(config.Config.Object.Enable)) }, func() error { return NewTask(mysqlDB, mongoDB, mgo.NewS3Mongo, c.Object(conf.Object.Enable)) },
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewLogMongo, c.Log) }, func() error { return NewTask(mysqlDB, mongoDB, mgo.NewLogMongo, c.Log) },
func() error { return NewTask(mysqlDB, mongoDB, rtcmgo.NewSignal, c.SignalModel) }, func() error { return NewTask(mysqlDB, mongoDB, rtcmgo.NewSignal, c.SignalModel) },

Loading…
Cancel
Save