diff --git a/cmd/openim-api/main.go b/cmd/openim-api/main.go index 438dfd13e..738a12111 100644 --- a/cmd/openim-api/main.go +++ b/cmd/openim-api/main.go @@ -17,6 +17,7 @@ package main import ( "context" "fmt" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" "net" "net/http" _ "net/http/pprof" @@ -38,7 +39,6 @@ import ( kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" ginprom "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" - util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) func main() { @@ -75,12 +75,21 @@ func run(port int, proPort int) error { if err = client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.Config.EncodeConfig()); err != nil { return err } - + var ( + netDone = make(chan struct{}, 1) + netErr error + ) router := api.NewGinRouter(client, rdb) if config.Config.Prometheus.Enable { - p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api")) - p.SetListenAddress(fmt.Sprintf(":%d", proPort)) - p.Use(router) + go func() { + p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api")) + p.SetListenAddress(fmt.Sprintf(":%d", proPort)) + if err := p.Use(router); err != nil && err != http.ErrServerClosed { + netErr = errs.Wrap(err, fmt.Sprintf("prometheus start err: %d", proPort)) + netDone <- struct{}{} + } + }() + } var address string @@ -89,17 +98,15 @@ func run(port int, proPort int) error { } else { address = net.JoinHostPort("0.0.0.0", strconv.Itoa(port)) } - var ( - netDone = make(chan struct{}, 1) - netErr error - ) + server := http.Server{Addr: address, Handler: router} go func() { err = server.ListenAndServe() if err != nil && err != http.ErrServerClosed { - netErr = errs.Wrap(err, "api start err", server.Addr) - close(netDone) + netErr = errs.Wrap(err, fmt.Sprintf("api start err: %s", server.Addr)) + netDone <- struct{}{} + } }() @@ -116,6 +123,7 @@ func run(port int, proPort int) error { return errs.Wrap(err, "shutdown err") } case <-netDone: + close(netDone) return netErr } return nil diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 65a6b1935..34532f9e2 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -18,22 +18,14 @@ import ( "context" "errors" "fmt" + "github.com/OpenIMSDK/tools/errs" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" + "net/http" "os" "os/signal" "syscall" - "time" - "net/http" - "sync" - - "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/mw" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/collectors" - "github.com/prometheus/client_golang/prometheus/promhttp" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" @@ -41,12 +33,18 @@ import ( kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) type MsgTransfer struct { historyCH *OnlineHistoryRedisConsumerHandler // 这个消费者聚合消息, 订阅的topic:ws2ms_chat, 修改通知发往msg_to_modify topic, 消息存入redis后Incr Redis, 再发消息到ms2pschat topic推送, 发消息到msg_to_mongo topic持久化 historyMongoCH *OnlineHistoryMongoConsumerHandler // mongoDB批量插入, 成功后删除redis中消息,以及处理删除通知消息删除的 订阅的topic: msg_to_mongo - // modifyCH *ModifyMsgConsumerHandler // 负责消费修改消息通知的consumer, 订阅的topic: msg_to_modify + ctx context.Context + cancel context.CancelFunc } func StartTransfer(prometheusPort int) error { @@ -64,10 +62,6 @@ func StartTransfer(prometheusPort int) error { return err } client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) - /* - client, err := openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, - openkeeper.WithFreq(time.Hour), openkeeper.WithRoundRobin(), openkeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username, - config.Config.Zookeeper.Password), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger()))*/ if err != nil { return err } @@ -108,27 +102,19 @@ func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcCli } func (m *MsgTransfer) Start(prometheusPort int) error { - ctx := context.Background() fmt.Println("start msg transfer", "prometheusPort:", prometheusPort) if prometheusPort <= 0 { return errs.Wrap(errors.New("prometheusPort not correct")) } + m.ctx, m.cancel = context.WithCancel(context.Background()) - var wg sync.WaitGroup - - wg.Add(1) - go func() { - defer wg.Done() + var ( + netDone = make(chan struct{}, 1) + netErr error + ) - m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyCH) - }() - - wg.Add(1) - go func() { - defer wg.Done() - - m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyMongoCH) - }() + go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH) + go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH) if config.Config.Prometheus.Enable { go func() { @@ -140,30 +126,28 @@ func (m *MsgTransfer) Start(prometheusPort int) error { http.Handle("/metrics", promhttp.HandlerFor(proreg, promhttp.HandlerOpts{Registry: proreg})) err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil) if err != nil && err != http.ErrServerClosed { - panic(err) + netErr = errs.Wrap(err, fmt.Sprintf("prometheus start err: %d", prometheusPort)) + netDone <- struct{}{} } }() } sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) - <-sigs - - // graceful close kafka client. - go m.historyCH.historyConsumerGroup.Close() - go m.historyMongoCH.historyConsumerGroup.Close() - - done := make(chan struct{}, 1) - go func() { - wg.Wait() - close(done) - }() - + signal.Notify(sigs, syscall.SIGUSR1) select { - case <-done: - log.ZInfo(context.Background(), "msgtrasfer exit successfully") - case <-time.After(15 * time.Second): - log.ZError(context.Background(), "msgtransfer force to exit, timeout 15s", nil) + case <-sigs: + util.SIGUSR1Exit() + // graceful close kafka client. + m.cancel() + m.historyCH.historyConsumerGroup.Close() + m.historyMongoCH.historyConsumerGroup.Close() + + case <-netDone: + m.cancel() + m.historyCH.historyConsumerGroup.Close() + m.historyMongoCH.historyConsumerGroup.Close() + close(netDone) + return netErr } return nil diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index c1226ce6b..caaf95525 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -16,8 +16,6 @@ package push import ( "context" - "sync" - "github.com/OpenIMSDK/tools/utils" "google.golang.org/grpc" @@ -58,23 +56,18 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e &groupRpcClient, &msgRpcClient, ) - var wg sync.WaitGroup - wg.Add(2) - go func() { - defer wg.Done() - pbpush.RegisterPushMsgServiceServer(server, &pushServer{ - pusher: pusher, - }) - }() + + pbpush.RegisterPushMsgServiceServer(server, &pushServer{ + pusher: pusher, + }) + consumer, err := NewConsumer(pusher) if err != nil { return err } - go func() { - defer wg.Done() - consumer.Start() - }() - wg.Wait() + + consumer.Start() + return nil } diff --git a/pkg/common/ginprometheus/ginprometheus.go b/pkg/common/ginprometheus/ginprometheus.go index 1ee8f8e34..c2e6bdcca 100644 --- a/pkg/common/ginprometheus/ginprometheus.go +++ b/pkg/common/ginprometheus/ginprometheus.go @@ -197,30 +197,32 @@ func (p *Prometheus) SetListenAddressWithRouter(listenAddress string, r *gin.Eng } // SetMetricsPath set metrics paths. -func (p *Prometheus) SetMetricsPath(e *gin.Engine) { +func (p *Prometheus) SetMetricsPath(e *gin.Engine) error { if p.listenAddress != "" { p.router.GET(p.MetricsPath, prometheusHandler()) - p.runServer() + return p.runServer() } else { e.GET(p.MetricsPath, prometheusHandler()) + return nil } } // SetMetricsPathWithAuth set metrics paths with authentication. -func (p *Prometheus) SetMetricsPathWithAuth(e *gin.Engine, accounts gin.Accounts) { +func (p *Prometheus) SetMetricsPathWithAuth(e *gin.Engine, accounts gin.Accounts) error { if p.listenAddress != "" { p.router.GET(p.MetricsPath, gin.BasicAuth(accounts), prometheusHandler()) - p.runServer() + return p.runServer() } else { e.GET(p.MetricsPath, gin.BasicAuth(accounts), prometheusHandler()) + return nil } } -func (p *Prometheus) runServer() { - go p.router.Run(p.listenAddress) +func (p *Prometheus) runServer() error { + return p.router.Run(p.listenAddress) } func (p *Prometheus) getMetrics() []byte { @@ -366,15 +368,15 @@ func (p *Prometheus) registerMetrics(subsystem string) { } // Use adds the middleware to a gin engine. -func (p *Prometheus) Use(e *gin.Engine) { +func (p *Prometheus) Use(e *gin.Engine) error { e.Use(p.HandlerFunc()) - p.SetMetricsPath(e) + return p.SetMetricsPath(e) } // UseWithAuth adds the middleware to a gin engine with BasicAuth. -func (p *Prometheus) UseWithAuth(e *gin.Engine, accounts gin.Accounts) { +func (p *Prometheus) UseWithAuth(e *gin.Engine, accounts gin.Accounts) error { e.Use(p.HandlerFunc()) - p.SetMetricsPathWithAuth(e, accounts) + return p.SetMetricsPathWithAuth(e, accounts) } // HandlerFunc defines handler function for middleware. diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 3f444cc1f..f20f8d4eb 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -18,16 +18,13 @@ import ( "context" "errors" "github.com/IBM/sarama" - "strings" "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "strings" ) type MConsumerGroup struct { - ctx context.Context - cancel context.CancelFunc - sarama.ConsumerGroup groupID string topics []string @@ -55,9 +52,7 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []str return nil, errs.Wrap(err, strings.Join(topics, ","), strings.Join(addrs, ","), groupID, config.Config.Kafka.Username, config.Config.Kafka.Password) } - ctx, cancel := context.WithCancel(context.Background()) return &MConsumerGroup{ - ctx, cancel, consumerGroup, groupID, topics, @@ -68,27 +63,22 @@ func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) contex return GetContextWithMQHeader(cMsg.Headers) } -func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) { - log.ZDebug(context.Background(), "register consumer group", "groupID", mc.groupID) +func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) error { + log.ZDebug(ctx, "register consumer group", "groupID", mc.groupID) for { - err := mc.ConsumerGroup.Consume(mc.ctx, mc.topics, handler) + err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler) if errors.Is(err, sarama.ErrClosedConsumerGroup) { - return + return nil } - if mc.ctx.Err() != nil { - return + if errors.Is(err, context.Canceled) { + return nil } - if err != nil { log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID) } - if ctx.Err() != nil { - return - } } } -func (mc *MConsumerGroup) Close() { - mc.cancel() - mc.ConsumerGroup.Close() +func (mc *MConsumerGroup) Close() error { + return mc.ConsumerGroup.Close() }