fix: reconstruct exit gracefully

pull/1885/head
luhaoling 2 years ago
parent dc5504d203
commit 482227552e

@ -89,26 +89,34 @@ func run(port int, proPort int) error {
} else { } else {
address = net.JoinHostPort("0.0.0.0", strconv.Itoa(port)) address = net.JoinHostPort("0.0.0.0", strconv.Itoa(port))
} }
var (
netDone = make(chan struct{}, 1)
netErr error
)
server := http.Server{Addr: address, Handler: router} server := http.Server{Addr: address, Handler: router}
go func() { go func() {
err = server.ListenAndServe() err = server.ListenAndServe()
if err != nil && err != http.ErrServerClosed { if err != nil && err != http.ErrServerClosed {
os.Exit(1) netErr = errs.Wrap(err, "api start err: ", server.Addr)
close(netDone)
} }
}() }()
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) signal.Notify(sigs, syscall.SIGUSR1)
<-sigs
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel() defer cancel()
select {
// graceful shutdown operation. case <-sigs:
if err := server.Shutdown(ctx); err != nil { print("receive process terminal SIGUSR1 exit")
return err err := server.Shutdown(ctx)
if err != nil {
return errs.Wrap(err, "shutdown err")
}
case <-netDone:
return netErr
} }
return nil return nil
} }

@ -18,9 +18,6 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/OpenIMSDK/tools/utils"
"golang.org/x/sync/errgroup"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
) )
@ -46,20 +43,12 @@ func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error {
} }
hubServer := NewServer(rpcPort, prometheusPort, longServer) hubServer := NewServer(rpcPort, prometheusPort, longServer)
netDone := make(chan error)
wg := errgroup.Group{} go func() {
wg.Go(func() error {
err = hubServer.Start() err = hubServer.Start()
if err != nil { if err != nil {
return utils.Wrap1(err) netDone <- err
} }
return err }()
}) return hubServer.LongConnServer.Run(netDone)
wg.Go(func() error {
return hubServer.LongConnServer.Run()
})
err = wg.Wait()
return err
} }

@ -20,12 +20,9 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
"os"
"os/signal"
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"syscall"
"time" "time"
"github.com/OpenIMSDK/tools/apiresp" "github.com/OpenIMSDK/tools/apiresp"
@ -49,7 +46,7 @@ import (
) )
type LongConnServer interface { type LongConnServer interface {
Run() error Run(done chan error) error
wsHandler(w http.ResponseWriter, r *http.Request) wsHandler(w http.ResponseWriter, r *http.Request)
GetUserAllCons(userID string) ([]*Client, bool) GetUserAllCons(userID string) ([]*Client, bool)
GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool) GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool)
@ -169,23 +166,20 @@ func NewWsServer(opts ...Option) (*WsServer, error) {
}, nil }, nil
} }
func (ws *WsServer) Run() error { func (ws *WsServer) Run(done chan error) error {
var ( var (
client *Client client *Client
wg errgroup.Group netErr error
shutdownDone = make(chan struct{}, 1)
sigs = make(chan os.Signal, 1)
done = make(chan struct{}, 1)
) )
server := http.Server{Addr: ":" + utils.IntToString(ws.port), Handler: nil} server := http.Server{Addr: ":" + utils.IntToString(ws.port), Handler: nil}
wg.Go(func() error { go func() {
for { for {
select { select {
case <-done: case <-shutdownDone:
return nil return
case client = <-ws.registerChan: case client = <-ws.registerChan:
ws.registerClient(client) ws.registerClient(client)
case client = <-ws.unregisterChan: case client = <-ws.unregisterChan:
@ -194,33 +188,32 @@ func (ws *WsServer) Run() error {
ws.multiTerminalLoginChecker(onlineInfo.clientOK, onlineInfo.oldClients, onlineInfo.newClient) ws.multiTerminalLoginChecker(onlineInfo.clientOK, onlineInfo.oldClients, onlineInfo.newClient)
} }
} }
}) }()
netDone := make(chan struct{}, 1)
wg.Go(func() error {
http.HandleFunc("/", ws.wsHandler)
return server.ListenAndServe()
})
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-sigs
go func() { go func() {
http.HandleFunc("/", ws.wsHandler)
err := server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
netErr = errs.Wrap(err, "ws start err: ", server.Addr)
close(netDone)
}
}()
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel() defer cancel()
var err error
// graceful exit operation for server
_ = server.Shutdown(ctx)
_ = wg.Wait()
close(done)
}()
select { select {
case <-done: case err = <-done:
return nil sErr := server.Shutdown(ctx)
if sErr != nil {
case <-time.After(15 * time.Second): return errs.Wrap(sErr, "shutdown err")
return utils.Wrap1(errors.New("timeout exit")) }
close(shutdownDone)
if err != nil {
return err
}
case <-netDone:
} }
return netErr
} }

@ -15,9 +15,10 @@
package startrpc package startrpc
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"log" "github.com/OpenIMSDK/tools/errs"
"net" "net"
"net/http" "net/http"
"os" "os"
@ -27,14 +28,10 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/OpenIMSDK/tools/errs"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/sync/errgroup"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -57,12 +54,13 @@ func Start(
) error { ) error {
fmt.Printf("start %s server, port: %d, prometheusPort: %d, OpenIM version: %s\n", fmt.Printf("start %s server, port: %d, prometheusPort: %d, OpenIM version: %s\n",
rpcRegisterName, rpcPort, prometheusPort, config.Version) rpcRegisterName, rpcPort, prometheusPort, config.Version)
rpcTcpAddr := net.JoinHostPort(network.GetListenIP(config.Config.Rpc.ListenIP), strconv.Itoa(rpcPort))
listener, err := net.Listen( listener, err := net.Listen(
"tcp", "tcp",
net.JoinHostPort(network.GetListenIP(config.Config.Rpc.ListenIP), strconv.Itoa(rpcPort)), rpcTcpAddr,
) )
if err != nil { if err != nil {
return errs.Wrap(err, network.GetListenIP(config.Config.Rpc.ListenIP), strconv.Itoa(rpcPort)) return errs.Wrap(err, rpcTcpAddr)
} }
defer listener.Close() defer listener.Close()
@ -109,48 +107,63 @@ func Start(
return errs.Wrap(err) return errs.Wrap(err)
} }
var wg errgroup.Group var (
netDone = make(chan struct{}, 1)
wg.Go(func() error { netErr error
httpServer *http.Server
)
go func() {
if config.Config.Prometheus.Enable && prometheusPort != 0 { if config.Config.Prometheus.Enable && prometheusPort != 0 {
metric.InitializeMetrics(srv) metric.InitializeMetrics(srv)
// Create a HTTP server for prometheus. // Create a HTTP server for prometheus.
httpServer := &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)}
if err := httpServer.ListenAndServe(); err != nil { if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatal("Unable to start a http server. ", err.Error(), "PrometheusPort:", prometheusPort) netErr = errs.Wrap(err, "prometheus start err: ", httpServer.Addr)
close(netDone)
} }
} }
return nil }()
})
wg.Go(func() error { go func() {
return errs.Wrap(srv.Serve(listener)) err := srv.Serve(listener)
}) if err != nil {
netErr = errs.Wrap(err, "rpc start err: ", rpcTcpAddr)
close(netDone)
}
}()
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) signal.Notify(sigs, syscall.SIGUSR1)
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
log.Println("23333333333:", <-sigs) defer cancel()
select {
<-sigs case <-sigs:
print("receive process terminal SIGUSR1 exit")
var ( if err := gracefulStopWithCtx(ctx, srv.GracefulStop); err != nil {
done = make(chan struct{}, 1) return err
gerr error }
) ctx, cancel = context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
err := httpServer.Shutdown(ctx)
if err != nil {
return errs.Wrap(err, "shutdown err")
}
case <-netDone:
return netErr
}
return nil
}
func gracefulStopWithCtx(ctx context.Context, f func()) error {
done := make(chan struct{}, 1)
go func() { go func() {
once.Do(srv.GracefulStop) f()
gerr = wg.Wait()
close(done) close(done)
}() }()
select { select {
case <-ctx.Done():
return errs.Wrap(errors.New("timeout"), "ctx graceful stop")
case <-done: case <-done:
return gerr return nil
case <-time.After(15 * time.Second):
return errs.Wrap(errors.New("timeout exit"))
} }
} }

Loading…
Cancel
Save