diff --git a/cmd/openim-api/main.go b/cmd/openim-api/main.go index bbb5eb968..c8746bc20 100644 --- a/cmd/openim-api/main.go +++ b/cmd/openim-api/main.go @@ -17,6 +17,7 @@ package main import ( "context" "fmt" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" "net" "net/http" _ "net/http/pprof" @@ -46,8 +47,7 @@ func main() { apiCmd.AddPrometheusPortFlag() apiCmd.AddApi(run) if err := apiCmd.Execute(); err != nil { - fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) - os.Exit(-1) + util.ExitWithError(err) } } @@ -76,12 +76,21 @@ func run(port int, proPort int) error { if err = client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.Config.EncodeConfig()); err != nil { return err } - + var ( + netDone = make(chan struct{}, 1) + netErr error + ) router := api.NewGinRouter(client, rdb) if config.Config.Prometheus.Enable { - p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api")) - p.SetListenAddress(fmt.Sprintf(":%d", proPort)) - p.Use(router) + go func() { + p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api")) + p.SetListenAddress(fmt.Sprintf(":%d", proPort)) + if err = p.Use(router); err != nil && err != http.ErrServerClosed { + netErr = errs.Wrap(err, fmt.Sprintf("prometheus start err: %d", proPort)) + netDone <- struct{}{} + } + }() + } var address string @@ -92,24 +101,31 @@ func run(port int, proPort int) error { } server := http.Server{Addr: address, Handler: router} + go func() { err = server.ListenAndServe() if err != nil && err != http.ErrServerClosed { - os.Exit(1) + netErr = errs.Wrap(err, fmt.Sprintf("api start err: %s", server.Addr)) + netDone <- struct{}{} + } }() sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) - <-sigs + signal.Notify(sigs, syscall.SIGTERM) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() - - // graceful shutdown operation. - if err := server.Shutdown(ctx); err != nil { - return err + select { + case <-sigs: + util.SIGUSR1Exit() + err := server.Shutdown(ctx) + if err != nil { + return errs.Wrap(err, "shutdown err") + } + case <-netDone: + close(netDone) + return netErr } - return nil } diff --git a/cmd/openim-cmdutils/main.go b/cmd/openim-cmdutils/main.go index 45b324766..f6b788933 100644 --- a/cmd/openim-cmdutils/main.go +++ b/cmd/openim-cmdutils/main.go @@ -15,10 +15,8 @@ package main import ( - "fmt" - "os" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) func main() { @@ -57,7 +55,6 @@ func main() { // openIM clear msg --clearAll msgUtilsCmd.AddCommand(&getCmd.Command, &fixCmd.Command, &clearCmd.Command) if err := msgUtilsCmd.Execute(); err != nil { - fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) - os.Exit(-1) + util.ExitWithError(err) } } diff --git a/cmd/openim-crontask/main.go b/cmd/openim-crontask/main.go index 324001690..b284fd773 100644 --- a/cmd/openim-crontask/main.go +++ b/cmd/openim-crontask/main.go @@ -15,17 +15,14 @@ package main import ( - "fmt" - "os" - "github.com/openimsdk/open-im-server/v3/internal/tools" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) func main() { cronTaskCmd := cmd.NewCronTaskCmd() if err := cronTaskCmd.Exec(tools.StartTask); err != nil { - fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) - os.Exit(-1) + util.ExitWithError(err) } } diff --git a/cmd/openim-msggateway/main.go b/cmd/openim-msggateway/main.go index 5339891c8..ed67b8f5d 100644 --- a/cmd/openim-msggateway/main.go +++ b/cmd/openim-msggateway/main.go @@ -15,10 +15,8 @@ package main import ( - "fmt" - "os" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) func main() { @@ -28,7 +26,6 @@ func main() { msgGatewayCmd.AddPrometheusPortFlag() if err := msgGatewayCmd.Exec(); err != nil { - fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) - os.Exit(-1) + util.ExitWithError(err) } } diff --git a/cmd/openim-msgtransfer/main.go b/cmd/openim-msgtransfer/main.go index cf1b44a55..84fbbd2ea 100644 --- a/cmd/openim-msgtransfer/main.go +++ b/cmd/openim-msgtransfer/main.go @@ -15,10 +15,8 @@ package main import ( - "fmt" - "os" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) func main() { @@ -26,7 +24,6 @@ func main() { msgTransferCmd.AddPrometheusPortFlag() msgTransferCmd.AddTransferProgressFlag() if err := msgTransferCmd.Exec(); err != nil { - fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) - os.Exit(-1) + util.ExitWithError(err) } } diff --git a/cmd/openim-push/main.go b/cmd/openim-push/main.go index 77f75cb4e..e0539fa52 100644 --- a/cmd/openim-push/main.go +++ b/cmd/openim-push/main.go @@ -15,12 +15,10 @@ package main import ( - "fmt" - "os" - "github.com/openimsdk/open-im-server/v3/internal/push" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) func main() { @@ -31,7 +29,6 @@ func main() { panic(err.Error()) } if err := pushCmd.StartSvr(config.Config.RpcRegisterName.OpenImPushName, push.Start); err != nil { - fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) - os.Exit(-1) + util.ExitWithError(err) } } diff --git a/cmd/openim-rpc/openim-rpc-auth/main.go b/cmd/openim-rpc/openim-rpc-auth/main.go index b29efd484..b526c3b86 100644 --- a/cmd/openim-rpc/openim-rpc-auth/main.go +++ b/cmd/openim-rpc/openim-rpc-auth/main.go @@ -15,12 +15,10 @@ package main import ( - "fmt" - "os" - "github.com/openimsdk/open-im-server/v3/internal/rpc/auth" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) func main() { @@ -31,7 +29,7 @@ func main() { panic(err.Error()) } if err := authCmd.StartSvr(config.Config.RpcRegisterName.OpenImAuthName, auth.Start); err != nil { - fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) - os.Exit(-1) + util.ExitWithError(err) } + } diff --git a/cmd/openim-rpc/openim-rpc-conversation/main.go b/cmd/openim-rpc/openim-rpc-conversation/main.go index f9ac8cd27..bde191c51 100644 --- a/cmd/openim-rpc/openim-rpc-conversation/main.go +++ b/cmd/openim-rpc/openim-rpc-conversation/main.go @@ -15,12 +15,10 @@ package main import ( - "fmt" - "os" - "github.com/openimsdk/open-im-server/v3/internal/rpc/conversation" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) func main() { @@ -31,7 +29,6 @@ func main() { panic(err.Error()) } if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImConversationName, conversation.Start); err != nil { - fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) - os.Exit(-1) + util.ExitWithError(err) } } diff --git a/cmd/openim-rpc/openim-rpc-friend/main.go b/cmd/openim-rpc/openim-rpc-friend/main.go index 82d71d522..8eeb9c8e1 100644 --- a/cmd/openim-rpc/openim-rpc-friend/main.go +++ b/cmd/openim-rpc/openim-rpc-friend/main.go @@ -15,12 +15,10 @@ package main import ( - "fmt" - "os" - "github.com/openimsdk/open-im-server/v3/internal/rpc/friend" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) func main() { @@ -31,7 +29,6 @@ func main() { panic(err.Error()) } if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImFriendName, friend.Start); err != nil { - fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) - os.Exit(-1) + util.ExitWithError(err) } } diff --git a/cmd/openim-rpc/openim-rpc-group/main.go b/cmd/openim-rpc/openim-rpc-group/main.go index 360042f84..a5842ffd1 100644 --- a/cmd/openim-rpc/openim-rpc-group/main.go +++ b/cmd/openim-rpc/openim-rpc-group/main.go @@ -15,12 +15,10 @@ package main import ( - "fmt" - "os" - "github.com/openimsdk/open-im-server/v3/internal/rpc/group" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) func main() { @@ -31,7 +29,6 @@ func main() { panic(err.Error()) } if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImGroupName, group.Start); err != nil { - fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) - os.Exit(-1) + util.ExitWithError(err) } } diff --git a/cmd/openim-rpc/openim-rpc-msg/main.go b/cmd/openim-rpc/openim-rpc-msg/main.go index bed57f522..b3895a502 100644 --- a/cmd/openim-rpc/openim-rpc-msg/main.go +++ b/cmd/openim-rpc/openim-rpc-msg/main.go @@ -15,12 +15,10 @@ package main import ( - "fmt" - "os" - "github.com/openimsdk/open-im-server/v3/internal/rpc/msg" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) func main() { @@ -31,7 +29,6 @@ func main() { panic(err.Error()) } if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImMsgName, msg.Start); err != nil { - fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) - os.Exit(-1) + util.ExitWithError(err) } } diff --git a/cmd/openim-rpc/openim-rpc-third/main.go b/cmd/openim-rpc/openim-rpc-third/main.go index 4868ce149..8f390bb6a 100644 --- a/cmd/openim-rpc/openim-rpc-third/main.go +++ b/cmd/openim-rpc/openim-rpc-third/main.go @@ -15,12 +15,10 @@ package main import ( - "fmt" - "os" - "github.com/openimsdk/open-im-server/v3/internal/rpc/third" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) func main() { @@ -31,7 +29,6 @@ func main() { panic(err.Error()) } if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImThirdName, third.Start); err != nil { - fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) - os.Exit(-1) + util.ExitWithError(err) } } diff --git a/cmd/openim-rpc/openim-rpc-user/main.go b/cmd/openim-rpc/openim-rpc-user/main.go index a77a2f768..6994ea2b1 100644 --- a/cmd/openim-rpc/openim-rpc-user/main.go +++ b/cmd/openim-rpc/openim-rpc-user/main.go @@ -15,12 +15,10 @@ package main import ( - "fmt" - "os" - "github.com/openimsdk/open-im-server/v3/internal/rpc/user" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) func main() { @@ -31,7 +29,6 @@ func main() { panic(err.Error()) } if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImUserName, user.Start); err != nil { - fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) - os.Exit(-1) + util.ExitWithError(err) } } diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index aeba0a24a..321407f7e 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -18,9 +18,6 @@ import ( "fmt" "time" - "github.com/OpenIMSDK/tools/utils" - "golang.org/x/sync/errgroup" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) @@ -46,20 +43,12 @@ func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error { } hubServer := NewServer(rpcPort, prometheusPort, longServer) - - wg := errgroup.Group{} - wg.Go(func() error { + netDone := make(chan error) + go func() { err = hubServer.Start() if err != nil { - return utils.Wrap1(err) + netDone <- err } - return err - }) - - wg.Go(func() error { - return hubServer.LongConnServer.Run() - }) - - err = wg.Wait() - return err + }() + return hubServer.LongConnServer.Run(netDone) } diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 01d92b92a..c16da7c64 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -20,12 +20,9 @@ import ( "errors" "fmt" "net/http" - "os" - "os/signal" "strconv" "sync" "sync/atomic" - "syscall" "time" "github.com/OpenIMSDK/tools/apiresp" @@ -49,7 +46,7 @@ import ( ) type LongConnServer interface { - Run() error + Run(done chan error) error wsHandler(w http.ResponseWriter, r *http.Request) GetUserAllCons(userID string) ([]*Client, bool) GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool) @@ -169,23 +166,20 @@ func NewWsServer(opts ...Option) (*WsServer, error) { }, nil } -func (ws *WsServer) Run() error { +func (ws *WsServer) Run(done chan error) error { var ( - client *Client - wg errgroup.Group - - sigs = make(chan os.Signal, 1) - done = make(chan struct{}, 1) + client *Client + netErr error + shutdownDone = make(chan struct{}, 1) ) server := http.Server{Addr: ":" + utils.IntToString(ws.port), Handler: nil} - wg.Go(func() error { + go func() { for { select { - case <-done: - return nil - + case <-shutdownDone: + return case client = <-ws.registerChan: ws.registerClient(client) case client = <-ws.unregisterChan: @@ -194,33 +188,32 @@ func (ws *WsServer) Run() error { ws.multiTerminalLoginChecker(onlineInfo.clientOK, onlineInfo.oldClients, onlineInfo.newClient) } } - }) - - wg.Go(func() error { - http.HandleFunc("/", ws.wsHandler) - return server.ListenAndServe() - }) - - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) - <-sigs - + }() + netDone := make(chan struct{}, 1) go func() { - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - - // graceful exit operation for server - _ = server.Shutdown(ctx) - _ = wg.Wait() - close(done) + http.HandleFunc("/", ws.wsHandler) + err := server.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + netErr = errs.Wrap(err, "ws start err", server.Addr) + close(netDone) + } }() - + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + var err error select { - case <-done: - return nil - - case <-time.After(15 * time.Second): - return utils.Wrap1(errors.New("timeout exit")) + case err = <-done: + sErr := server.Shutdown(ctx) + if sErr != nil { + return errs.Wrap(sErr, "shutdown err") + } + close(shutdownDone) + if err != nil { + return err + } + case <-netDone: } + return netErr } diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 65518c324..062017f44 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -18,23 +18,11 @@ import ( "context" "errors" "fmt" - "net/http" - "os" - "os/signal" - "sync" - "syscall" - "time" - "github.com/OpenIMSDK/tools/errs" - "github.com/OpenIMSDK/tools/log" - "github.com/OpenIMSDK/tools/mw" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/collectors" - "github.com/prometheus/client_golang/prometheus/promhttp" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" + "github.com/OpenIMSDK/tools/mw" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" @@ -42,12 +30,22 @@ import ( kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "net/http" + "os" + "os/signal" + "syscall" ) type MsgTransfer struct { historyCH *OnlineHistoryRedisConsumerHandler // 这个消费者聚合消息, 订阅的topic:ws2ms_chat, 修改通知发往msg_to_modify topic, 消息存入redis后Incr Redis, 再发消息到ms2pschat topic推送, 发消息到msg_to_mongo topic持久化 historyMongoCH *OnlineHistoryMongoConsumerHandler // mongoDB批量插入, 成功后删除redis中消息,以及处理删除通知消息删除的 订阅的topic: msg_to_mongo - // modifyCH *ModifyMsgConsumerHandler // 负责消费修改消息通知的consumer, 订阅的topic: msg_to_modify + ctx context.Context + cancel context.CancelFunc } func StartTransfer(prometheusPort int) error { @@ -65,10 +63,6 @@ func StartTransfer(prometheusPort int) error { return err } client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) - /* - client, err := openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, - openkeeper.WithFreq(time.Hour), openkeeper.WithRoundRobin(), openkeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username, - config.Config.Zookeeper.Password), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger()))*/ if err != nil { return err } @@ -109,27 +103,22 @@ func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcCli } func (m *MsgTransfer) Start(prometheusPort int) error { - ctx := context.Background() fmt.Println("start msg transfer", "prometheusPort:", prometheusPort) if prometheusPort <= 0 { return errs.Wrap(errors.New("prometheusPort not correct")) } + m.ctx, m.cancel = context.WithCancel(context.Background()) - var wg sync.WaitGroup + var ( + netDone = make(chan struct{}, 1) + netErr error + ) - wg.Add(1) - go func() { - defer wg.Done() - - m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyCH) - }() - - wg.Add(1) - go func() { - defer wg.Done() - - m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyMongoCH) - }() + onError := func(ctx context.Context, err error, errInfo string) { + log.ZWarn(ctx, errInfo, err) + } + go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH, onError) + go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH, onError) if config.Config.Prometheus.Enable { go func() { @@ -141,30 +130,28 @@ func (m *MsgTransfer) Start(prometheusPort int) error { http.Handle("/metrics", promhttp.HandlerFor(proreg, promhttp.HandlerOpts{Registry: proreg})) err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil) if err != nil && err != http.ErrServerClosed { - panic(err) + netErr = errs.Wrap(err, fmt.Sprintf("prometheus start err: %d", prometheusPort)) + netDone <- struct{}{} } }() } sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) - <-sigs - - // graceful close kafka client. - go m.historyCH.historyConsumerGroup.Close() - go m.historyMongoCH.historyConsumerGroup.Close() - - done := make(chan struct{}, 1) - go func() { - wg.Wait() - close(done) - }() - + signal.Notify(sigs, syscall.SIGTERM) select { - case <-done: - log.ZInfo(context.Background(), "msgtrasfer exit successfully") - case <-time.After(15 * time.Second): - log.ZError(context.Background(), "msgtransfer force to exit, timeout 15s", nil) + case <-sigs: + util.SIGUSR1Exit() + // graceful close kafka client. + m.cancel() + m.historyCH.historyConsumerGroup.Close() + m.historyMongoCH.historyConsumerGroup.Close() + + case <-netDone: + m.cancel() + m.historyCH.historyConsumerGroup.Close() + m.historyMongoCH.historyConsumerGroup.Close() + close(netDone) + return netErr } return nil diff --git a/internal/push/callback.go b/internal/push/callback.go index 99a58fb07..a572fa572 100644 --- a/internal/push/callback.go +++ b/internal/push/callback.go @@ -130,7 +130,7 @@ func callbackBeforeSuperGroupOnlinePush( if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSuperGroupOnlinePush); err != nil { return err } - return nil + if len(resp.UserIDs) != 0 { *pushToUserIDs = resp.UserIDs } diff --git a/internal/push/consumer_init.go b/internal/push/consumer_init.go index ceab86165..80478de99 100644 --- a/internal/push/consumer_init.go +++ b/internal/push/consumer_init.go @@ -14,7 +14,10 @@ package push -import "context" +import ( + "context" + "github.com/OpenIMSDK/tools/log" +) type Consumer struct { pushCh ConsumerHandler @@ -32,6 +35,9 @@ func NewConsumer(pusher *Pusher) (*Consumer, error) { } func (c *Consumer) Start() { + onError := func(ctx context.Context, err error, errInfo string) { + log.ZWarn(ctx, errInfo, err) + } + go c.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(context.Background(), &c.pushCh, onError) - go c.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(context.Background(), &c.pushCh) } diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index c1226ce6b..caaf95525 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -16,8 +16,6 @@ package push import ( "context" - "sync" - "github.com/OpenIMSDK/tools/utils" "google.golang.org/grpc" @@ -58,23 +56,18 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e &groupRpcClient, &msgRpcClient, ) - var wg sync.WaitGroup - wg.Add(2) - go func() { - defer wg.Done() - pbpush.RegisterPushMsgServiceServer(server, &pushServer{ - pusher: pusher, - }) - }() + + pbpush.RegisterPushMsgServiceServer(server, &pushServer{ + pusher: pusher, + }) + consumer, err := NewConsumer(pusher) if err != nil { return err } - go func() { - defer wg.Done() - consumer.Start() - }() - wg.Wait() + + consumer.Start() + return nil } diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 40e1c0a87..decc1aa82 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -64,7 +64,7 @@ func StartTask() error { crontab.Start() sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + signal.Notify(sigs, syscall.SIGTERM) <-sigs // stop crontab, Wait for the running task to exit. diff --git a/pkg/common/ginprometheus/ginprometheus.go b/pkg/common/ginprometheus/ginprometheus.go index 1ee8f8e34..c2e6bdcca 100644 --- a/pkg/common/ginprometheus/ginprometheus.go +++ b/pkg/common/ginprometheus/ginprometheus.go @@ -197,30 +197,32 @@ func (p *Prometheus) SetListenAddressWithRouter(listenAddress string, r *gin.Eng } // SetMetricsPath set metrics paths. -func (p *Prometheus) SetMetricsPath(e *gin.Engine) { +func (p *Prometheus) SetMetricsPath(e *gin.Engine) error { if p.listenAddress != "" { p.router.GET(p.MetricsPath, prometheusHandler()) - p.runServer() + return p.runServer() } else { e.GET(p.MetricsPath, prometheusHandler()) + return nil } } // SetMetricsPathWithAuth set metrics paths with authentication. -func (p *Prometheus) SetMetricsPathWithAuth(e *gin.Engine, accounts gin.Accounts) { +func (p *Prometheus) SetMetricsPathWithAuth(e *gin.Engine, accounts gin.Accounts) error { if p.listenAddress != "" { p.router.GET(p.MetricsPath, gin.BasicAuth(accounts), prometheusHandler()) - p.runServer() + return p.runServer() } else { e.GET(p.MetricsPath, gin.BasicAuth(accounts), prometheusHandler()) + return nil } } -func (p *Prometheus) runServer() { - go p.router.Run(p.listenAddress) +func (p *Prometheus) runServer() error { + return p.router.Run(p.listenAddress) } func (p *Prometheus) getMetrics() []byte { @@ -366,15 +368,15 @@ func (p *Prometheus) registerMetrics(subsystem string) { } // Use adds the middleware to a gin engine. -func (p *Prometheus) Use(e *gin.Engine) { +func (p *Prometheus) Use(e *gin.Engine) error { e.Use(p.HandlerFunc()) - p.SetMetricsPath(e) + return p.SetMetricsPath(e) } // UseWithAuth adds the middleware to a gin engine with BasicAuth. -func (p *Prometheus) UseWithAuth(e *gin.Engine, accounts gin.Accounts) { +func (p *Prometheus) UseWithAuth(e *gin.Engine, accounts gin.Accounts) error { e.Use(p.HandlerFunc()) - p.SetMetricsPathWithAuth(e, accounts) + return p.SetMetricsPathWithAuth(e, accounts) } // HandlerFunc defines handler function for middleware. diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 5bff50d88..908b8f088 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -17,19 +17,16 @@ package kafka import ( "context" "errors" - "strings" - + "fmt" "github.com/IBM/sarama" "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "strings" ) type MConsumerGroup struct { - ctx context.Context - cancel context.CancelFunc - sarama.ConsumerGroup groupID string topics []string @@ -57,9 +54,7 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []str return nil, errs.Wrap(err, strings.Join(topics, ","), strings.Join(addrs, ","), groupID, config.Config.Kafka.Username, config.Config.Kafka.Password) } - ctx, cancel := context.WithCancel(context.Background()) return &MConsumerGroup{ - ctx, cancel, consumerGroup, groupID, topics, @@ -70,27 +65,21 @@ func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) contex return GetContextWithMQHeader(cMsg.Headers) } -func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) { - log.ZDebug(context.Background(), "register consumer group", "groupID", mc.groupID) +func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler, onError func(context.Context, error, string)) { + log.ZDebug(ctx, "register consumer group", "groupID", mc.groupID) for { - err := mc.ConsumerGroup.Consume(mc.ctx, mc.topics, handler) - if errors.Is(err, sarama.ErrClosedConsumerGroup) { + err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler) + if errors.Is(err, sarama.ErrClosedConsumerGroup) || errors.Is(err, context.Canceled) { return } - if mc.ctx.Err() != nil { - return - } - if err != nil { - log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID) - } - if ctx.Err() != nil { + errInfo := fmt.Sprintf("consume err: %v, topic: %v, groupID: %s", err, strings.Join(mc.topics, ", "), mc.groupID) + onError(ctx, err, errInfo) // 调用回调函数处理错误 return } } } -func (mc *MConsumerGroup) Close() { - mc.cancel() - mc.ConsumerGroup.Close() +func (mc *MConsumerGroup) Close() error { + return mc.ConsumerGroup.Close() } diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index f6cda2ffb..4b032e9d6 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -15,8 +15,11 @@ package startrpc import ( + "context" "errors" "fmt" + "github.com/OpenIMSDK/tools/errs" + util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" "net" "net/http" "os" @@ -26,14 +29,10 @@ import ( "syscall" "time" - "github.com/OpenIMSDK/tools/errs" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - "golang.org/x/sync/errgroup" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" @@ -56,12 +55,13 @@ func Start( ) error { fmt.Printf("start %s server, port: %d, prometheusPort: %d, OpenIM version: %s\n", rpcRegisterName, rpcPort, prometheusPort, config.Version) + rpcTcpAddr := net.JoinHostPort(network.GetListenIP(config.Config.Rpc.ListenIP), strconv.Itoa(rpcPort)) listener, err := net.Listen( "tcp", - net.JoinHostPort(network.GetListenIP(config.Config.Rpc.ListenIP), strconv.Itoa(rpcPort)), + rpcTcpAddr, ) if err != nil { - return errs.Wrap(err, network.GetListenIP(config.Config.Rpc.ListenIP), strconv.Itoa(rpcPort)) + return errs.Wrap(err, "rpc start err", rpcTcpAddr) } defer listener.Close() @@ -108,46 +108,64 @@ func Start( return errs.Wrap(err) } - var wg errgroup.Group - - wg.Go(func() error { + var ( + netDone = make(chan struct{}, 2) + netErr error + httpServer *http.Server + ) + go func() { if config.Config.Prometheus.Enable && prometheusPort != 0 { metric.InitializeMetrics(srv) // Create a HTTP server for prometheus. - httpServer := &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} - if err := httpServer.ListenAndServe(); err != nil { - fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v PrometheusPort: %d \n\n", err, prometheusPort) - os.Exit(-1) + httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} + if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + netErr = errs.Wrap(err, "prometheus start err", httpServer.Addr) + netDone <- struct{}{} } } - return nil - }) + }() - wg.Go(func() error { - return errs.Wrap(srv.Serve(listener)) - }) + go func() { + err := srv.Serve(listener) + if err != nil { + netErr = errs.Wrap(err, "rpc start err: ", rpcTcpAddr) + netDone <- struct{}{} + } + }() sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) - <-sigs - - var ( - done = make(chan struct{}, 1) - gerr error - ) + signal.Notify(sigs, syscall.SIGTERM) + select { + case <-sigs: + util.SIGUSR1Exit() + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + if err := gracefulStopWithCtx(ctx, srv.GracefulStop); err != nil { + return err + } + ctx, cancel = context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + err := httpServer.Shutdown(ctx) + if err != nil { + return errs.Wrap(err, "shutdown err") + } + return errors.New("SIGTERM EXIT") + case <-netDone: + close(netDone) + return netErr + } +} +func gracefulStopWithCtx(ctx context.Context, f func()) error { + done := make(chan struct{}, 1) go func() { - once.Do(srv.GracefulStop) - gerr = wg.Wait() + f() close(done) }() - select { + case <-ctx.Done(): + return errs.Wrap(errors.New("timeout, ctx graceful stop")) case <-done: - return gerr - - case <-time.After(15 * time.Second): - return errs.Wrap(errors.New("timeout exit")) + return nil } - } diff --git a/pkg/util/genutil/genutil.go b/pkg/util/genutil/genutil.go index 0948a7c49..f97b803f6 100644 --- a/pkg/util/genutil/genutil.go +++ b/pkg/util/genutil/genutil.go @@ -15,6 +15,7 @@ package genutil import ( + "errors" "fmt" "os" "path/filepath" @@ -39,3 +40,17 @@ func OutDir(path string) (string, error) { outDir += "/" return outDir, nil } + +func ExitWithError(err error) { + if errors.Is(err, errors.New("SIGTERM EXIT")) { + os.Exit(-1) + } + progName := filepath.Base(os.Args[0]) + fmt.Fprintf(os.Stderr, "\n\n%s exit -1: \n%+v\n\n", progName, err) + os.Exit(-1) +} + +func SIGUSR1Exit() { + progName := filepath.Base(os.Args[0]) + fmt.Printf("\n\n%s receive process terminal SIGTERM exit 0\n\n", progName) +} diff --git a/scripts/githooks/pre-commit.sh b/scripts/githooks/pre-commit.sh index cc756c9ad..d8396b560 100644 --- a/scripts/githooks/pre-commit.sh +++ b/scripts/githooks/pre-commit.sh @@ -105,7 +105,7 @@ fi if [[ ! $local_branch =~ $valid_branch_regex ]] then printError "There is something wrong with your branch name. Branch names in this project must adhere to this contract: $valid_branch_regex. -Your commit will be rejected. You should rename your branch to a valid name(feat/name OR bug/name) and try again." +Your commit will be rejected. You should rename your branch to a valid name(feat/name OR fix/name) and try again." printError "For more on this, read on: https://gist.github.com/cubxxw/126b72104ac0b0ca484c9db09c3e5694" exit 1 fi \ No newline at end of file diff --git a/scripts/install/openim-push.sh b/scripts/install/openim-push.sh index ab12735c1..95da16c8a 100755 --- a/scripts/install/openim-push.sh +++ b/scripts/install/openim-push.sh @@ -73,7 +73,7 @@ function openim::push::start() { for (( i=0; i<${#OPENIM_PUSH_PORTS_ARRAY[@]}; i++ )); do openim::log::info "start push process, port: ${OPENIM_PUSH_PORTS_ARRAY[$i]}, prometheus port: ${PUSH_PROM_PORTS_ARRAY[$i]}" - nohup ${OPENIM_PUSH_BINARY} --port ${OPENIM_PUSH_PORTS_ARRAY[$i]} -c ${OPENIM_PUSH_CONFIG} --prometheus_port ${PUSH_PROM_PORTS_ARRAY[$i]} >${LOG_FILE} 2> >(tee -a "${STDERR_LOG_FILE}" "$TMP_LOG_FILE" >&2) & + nohup ${OPENIM_PUSH_BINARY} --port ${OPENIM_PUSH_PORTS_ARRAY[$i]} -c ${OPENIM_PUSH_CONFIG} --prometheus_port ${PUSH_PROM_PORTS_ARRAY[$i]} >> ${LOG_FILE} 2> >(tee -a "${STDERR_LOG_FILE}" "$TMP_LOG_FILE" >&2) & done openim::util::check_process_names ${SERVER_NAME} diff --git a/scripts/lib/util.sh b/scripts/lib/util.sh index 7acb1fcdd..1bdb7f640 100755 --- a/scripts/lib/util.sh +++ b/scripts/lib/util.sh @@ -486,7 +486,7 @@ openim::util::stop_services_on_ports() { local pid=$(echo $line | awk '{print $2}') # Try to stop the service by killing its process. - if kill -10 $pid; then + if kill -15 $pid; then stopped+=($port) else not_stopped+=($port) @@ -561,7 +561,7 @@ openim::util::stop_services_with_name() { # If there's a Process ID, it means the service with the name is running. if [[ -n $pid ]]; then # Try to stop the service by killing its process. - if kill -10 $pid 2>/dev/null; then + if kill -15 $pid 2>/dev/null; then stopped_this_time=true fi fi