diff --git a/internal/api/init.go b/internal/api/init.go index 6e784da9a..b33f800c9 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -51,10 +51,6 @@ func Start(ctx context.Context, index int, config *Config) error { if err != nil { return err } - prometheusPort, err := datautil.GetElemByIndex(config.RpcConfig.Prometheus.Ports, index) - if err != nil { - return err - } var client discovery.SvcDiscoveryRegistry @@ -65,13 +61,20 @@ func Start(ctx context.Context, index int, config *Config) error { } var ( - netDone = make(chan struct{}, 1) - netErr error + netDone = make(chan struct{}, 1) + netErr error + prometheusPort int ) router := newGinRouter(client, config) if config.RpcConfig.Prometheus.Enable { go func() { + prometheusPort, err = datautil.GetElemByIndex(config.RpcConfig.Prometheus.Ports, index) + if err != nil { + netErr = err + netDone <- struct{}{} + return + } p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api")) p.SetListenAddress(fmt.Sprintf(":%d", prometheusPort)) if err = p.Use(router); err != nil && err != http.ErrServerClosed { diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index bfe81b602..544d9a84b 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -47,7 +47,6 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error { type Server struct { rpcPort int - prometheusPort int LongConnServer LongConnServer config *Config pushTerminal map[int]struct{} @@ -57,10 +56,9 @@ func (s *Server) SetLongConnServer(LongConnServer LongConnServer) { s.LongConnServer = LongConnServer } -func NewServer(rpcPort int, proPort int, longConnServer LongConnServer, conf *Config) *Server { +func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config) *Server { s := &Server{ rpcPort: rpcPort, - prometheusPort: proPort, LongConnServer: longConnServer, pushTerminal: make(map[int]struct{}), config: conf, diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 727ade0af..8c834d4b2 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -38,10 +38,6 @@ func Start(ctx context.Context, index int, conf *Config) error { if err != nil { return err } - prometheusPort, err := datautil.GetElemByIndex(conf.MsgGateway.Prometheus.Ports, index) - if err != nil { - return err - } rpcPort, err := datautil.GetElemByIndex(conf.MsgGateway.RPC.Ports, index) if err != nil { return err @@ -57,7 +53,7 @@ func Start(ctx context.Context, index int, conf *Config) error { return err } - hubServer := NewServer(rpcPort, prometheusPort, longServer, conf) + hubServer := NewServer(rpcPort, longServer, conf) netDone := make(chan error) go func() { err = hubServer.Start(ctx, index, conf) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index a0d274d5e..1a4b5c63f 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -44,13 +44,13 @@ import ( ) type MsgTransfer struct { - // This consumer aggregated messages, subscribed to the topic:ws2ms_chat, + // This consumer aggregated messages, subscribed to the topic:toRedis, // the modification notification is sent to msg_to_modify topic, the message is stored in redis, Incr Redis, - // and then the message is sent to ms2pschat topic for push, and the message is sent to msg_to_mongo topic for persistence + // and then the message is sent to ms2pschat topic for push, and the message is sent to toMongo topic for persistence historyCH *OnlineHistoryRedisConsumerHandler historyMongoCH *OnlineHistoryMongoConsumerHandler // mongoDB batch insert, delete messages in redis after success, - // and handle the deletion notification message deleted subscriptions topic: msg_to_mongo + // and handle the deletion notification message deleted subscriptions topic: to ctx context.Context cancel context.CancelFunc } @@ -82,7 +82,6 @@ func Start(ctx context.Context, index int, config *Config) error { } client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) - //todo MsgCacheTimeout msgModel := cache.NewMsgCache(rdb, config.RedisConfig.EnablePipeline) seqModel := cache.NewSeqCache(rdb) msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) @@ -111,10 +110,7 @@ func Start(ctx context.Context, index int, config *Config) error { } func (m *MsgTransfer) Start(index int, config *Config) error { - prometheusPort, err := datautil.GetElemByIndex(config.MsgTransfer.Prometheus.Ports, index) - if err != nil { - return err - } + m.ctx, m.cancel = context.WithCancel(context.Background()) var ( @@ -124,20 +120,26 @@ func (m *MsgTransfer) Start(index int, config *Config) error { go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH) go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH) - err = m.historyCH.redisMessageBatches.Start() + err := m.historyCH.redisMessageBatches.Start() if err != nil { return err } if config.MsgTransfer.Prometheus.Enable { go func() { + prometheusPort, err := datautil.GetElemByIndex(config.MsgTransfer.Prometheus.Ports, index) + if err != nil { + netErr = err + netDone <- struct{}{} + return + } proreg := prometheus.NewRegistry() proreg.MustRegister( collectors.NewGoCollector(), ) proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer", &config.Share)...) http.Handle("/metrics", promhttp.HandlerFor(proreg, promhttp.HandlerOpts{Registry: proreg})) - err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil) + err = http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil) if err != nil && err != http.ErrServerClosed { netErr = errs.WrapMsg(err, "prometheus start error", "prometheusPort", prometheusPort) netDone <- struct{}{} diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index ebcd5aa7c..1e8f3248d 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -52,12 +52,9 @@ func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prome if err != nil { return err } - prometheusPort, err := datautil.GetElemByIndex(prometheusConfig.Ports, index) - if err != nil { - return err - } + log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", rpcPort, - "prometheusPort", prometheusPort) + "prometheusPorts", prometheusConfig.Ports) rpcTcpAddr := net.JoinHostPort(network.GetListenIP(listenIP), strconv.Itoa(rpcPort)) listener, err := net.Listen( "tcp", @@ -117,9 +114,14 @@ func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prome netErr error httpServer *http.Server ) - - go func() { - if prometheusConfig.Enable && prometheusPort != 0 { + if prometheusConfig.Enable { + go func() { + prometheusPort, err := datautil.GetElemByIndex(prometheusConfig.Ports, index) + if err != nil { + netErr = err + netDone <- struct{}{} + return + } metric.InitializeMetrics(srv) // Create a HTTP server for prometheus. httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} @@ -127,8 +129,8 @@ func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prome netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr) netDone <- struct{}{} } - } - }() + }() + } go func() { err := srv.Serve(listener) diff --git a/pkg/tools/batcher/batcher.go b/pkg/tools/batcher/batcher.go index 632ce512c..fa98bb90d 100644 --- a/pkg/tools/batcher/batcher.go +++ b/pkg/tools/batcher/batcher.go @@ -24,7 +24,7 @@ type Config struct { dataBuffer int // The size of the main data channel worker int // Number of coroutines processed in parallel interval time.Duration // Time of message aggregations - syncWait bool // Whether to wait synchronously after distributing messages + syncWait bool // Whether to wait synchronously after distributing messages have been consumed } type Option func(c *Config)