mir: optimize app start service logic to fixed mutilple service serve in sample port will occurs error when stop app

pull/196/head
Michael Li 2 years ago
parent f2265bc760
commit ba27352c52
No known key found for this signature in database

@ -20,9 +20,7 @@ import (
"github.com/rocboss/paopao-ce/pkg/xerror"
)
type BaseServant struct {
// TODO
}
type BaseServant types.Empty
type DaoServant struct {
Redis *redis.Client

@ -33,7 +33,7 @@ func (s *adminService) Version() *semver.Version {
}
func (s *adminService) OnInit() error {
s.registerRoute(servants.RegisterAdminServants)
s.registerRoute(s, servants.RegisterAdminServants)
return nil
}

@ -33,7 +33,7 @@ func (s *botService) Version() *semver.Version {
}
func (s *botService) OnInit() error {
s.registerRoute(servants.RegisterBotServants)
s.registerRoute(s, servants.RegisterBotServants)
return nil
}

@ -31,7 +31,7 @@ func (s *docsService) Version() *semver.Version {
}
func (s *docsService) OnInit() error {
s.registerRoute(servants.RegisterDocsServants)
s.registerRoute(s, servants.RegisterDocsServants)
return nil
}

@ -31,7 +31,7 @@ func (s *frontendWebService) Version() *semver.Version {
}
func (s *frontendWebService) OnInit() error {
s.registerRoute(servants.RegisterFrontendWebServants)
s.registerRoute(s, servants.RegisterFrontendWebServants)
return nil
}

@ -23,32 +23,10 @@ type grpcServer struct {
}
func (s *grpcServer) start() error {
s.Lock()
if s.serverStatus == _statusServerStarted || s.serverStatus == _statusServerStoped {
return nil
}
oldStatus := s.serverStatus
s.serverStatus = _statusServerStarted
s.Unlock()
if err := s.server.Serve(s.listener); err != nil {
s.Lock()
s.serverStatus = oldStatus
s.Unlock()
return err
}
return nil
return s.server.Serve(s.listener)
}
func (s *grpcServer) stop() error {
s.Lock()
defer s.Unlock()
if s.serverStatus == _statusServerStoped || s.serverStatus == _statusServerInitilized {
return nil
}
s.server.Stop()
s.serverStatus = _statusServerStoped
return nil
}

@ -14,19 +14,17 @@ type baseGRPCService struct {
server *grpcServer
}
func (s *baseGRPCService) registerServer(h func(s *grpc.Server)) {
if s.server.status() != _statusServerStarted {
func (s *baseGRPCService) registerServer(srv Service, h func(s *grpc.Server)) {
h(s.server.server)
}
s.server.addService(srv)
}
func (s *baseGRPCService) OnStart() error {
if err := s.server.start(); err != nil {
return err
}
// do nothing default
return nil
}
func (s *baseGRPCService) OnStop() error {
return s.server.stop()
// do nothing default
return nil
}

@ -24,34 +24,9 @@ type httpServer struct {
}
func (s *httpServer) start() error {
s.Lock()
if s.serverStatus == _statusServerStarted || s.serverStatus == _statusServerStoped {
return nil
}
oldStatus := s.serverStatus
s.serverStatus = _statusServerStarted
s.Unlock()
if err := s.server.ListenAndServe(); err != nil {
s.Lock()
s.serverStatus = oldStatus
s.Unlock()
return err
}
return nil
return s.server.ListenAndServe()
}
func (s *httpServer) stop() error {
s.Lock()
defer s.Unlock()
if s.serverStatus == _statusServerStoped || s.serverStatus == _statusServerInitilized {
return nil
}
if err := s.server.Shutdown(context.Background()); err != nil {
return err
}
s.serverStatus = _statusServerStoped
return nil
return s.server.Shutdown(context.Background())
}

@ -14,19 +14,17 @@ type baseHttpService struct {
server *httpServer
}
func (s *baseHttpService) registerRoute(h func(e *gin.Engine)) {
if s.server.status() != _statusServerStarted {
func (s *baseHttpService) registerRoute(srv Service, h func(e *gin.Engine)) {
h(s.server.e)
}
s.server.addService(srv)
}
func (s *baseHttpService) OnStart() error {
if err := s.server.start(); err != nil {
return err
}
// do nothing default
return nil
}
func (s *baseHttpService) OnStop() error {
return s.server.stop()
// do nothing default
return nil
}

@ -32,7 +32,7 @@ func (s *localossService) Version() *semver.Version {
}
func (s *localossService) OnInit() error {
s.registerRoute(servants.RegisterLocalossServants)
s.registerRoute(s, servants.RegisterLocalossServants)
return nil
}

@ -33,7 +33,7 @@ func (s *mobileService) Version() *semver.Version {
}
func (s *mobileService) OnInit() error {
s.registerServer(servants.RegisterMobileServants)
s.registerServer(s, servants.RegisterMobileServants)
return nil
}

@ -4,7 +4,15 @@
package service
import "sync"
import (
"fmt"
"sync"
"github.com/fatih/color"
"github.com/gin-gonic/gin"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/pkg/util"
)
var (
httpServers = newServerPool[*httpServer]()
@ -12,17 +20,16 @@ var (
)
const (
_statusServerUnknow uint8 = iota
_statusServerInitilized
_statusServerStarted
_statusServerStoped
actOnStart byte = iota
actOnStop
actStart
actStop
)
type server interface {
status() uint8
setStatus(uint8)
start() error
stop() error
services() []Service
}
type serverPool[T server] struct {
@ -30,8 +37,7 @@ type serverPool[T server] struct {
}
type baseServer struct {
sync.RWMutex
serverStatus uint8
ss map[string]Service
}
func (p *serverPool[T]) from(addr string, newServer func() T) T {
@ -40,23 +46,60 @@ func (p *serverPool[T]) from(addr string, newServer func() T) T {
return s
}
s = newServer()
s.setStatus(_statusServerInitilized)
p.servers[addr] = s
return s
}
func (s *baseServer) setStatus(status uint8) {
s.RLock()
defer s.RUnlock()
func (p *serverPool[T]) startServer(wg *sync.WaitGroup, maxSidSize int) {
for _, srv := range p.servers {
wg.Add(1)
go func(t T) {
ss := t.services()
if len(ss) < 1 {
return
}
for _, s := range ss {
colorPrint(actOnStart, s.OnStart(), maxSidSize, s)
}
colorPrint(actStart, t.start(), maxSidSize, ss...)
// remember to done sync.WaitGroup
wg.Done()
}(srv)
}
}
s.serverStatus = status
func (p *serverPool[T]) stopServer(maxSidSize int) {
for _, srv := range p.servers {
ss := srv.services()
if len(ss) < 1 {
return
}
for _, s := range ss {
colorPrint(actOnStop, s.OnStop(), maxSidSize, s)
}
colorPrint(actStop, srv.stop(), maxSidSize, ss...)
}
}
func (s *baseServer) status() uint8 {
s.RLock()
defer s.RUnlock()
func (p *serverPool[T]) allServices() (ss []Service) {
for _, srv := range p.servers {
ss = append(ss, srv.services()...)
}
return
}
return s.serverStatus
func (s *baseServer) addService(srv Service) {
if srv != nil {
sid := srv.Name() + "@" + srv.Version().String()
s.ss[sid] = srv
}
}
func (s *baseServer) services() (ss []Service) {
for _, s := range s.ss {
ss = append(ss, s)
}
return
}
func newServerPool[T server]() *serverPool[T] {
@ -67,7 +110,81 @@ func newServerPool[T server]() *serverPool[T] {
func newBaseServe() *baseServer {
return &baseServer{
RWMutex: sync.RWMutex{},
serverStatus: _statusServerUnknow,
ss: make(map[string]Service),
}
}
func checkServices() (int, int) {
var ss []Service
ss = append(ss, httpServers.allServices()...)
ss = append(ss, grpcServers.allServices()...)
return len(ss), maxSidSize(ss)
}
// maxSidSize max service id string length
func maxSidSize(ss []Service) int {
length := 0
for _, s := range ss {
size := len(s.Name() + "@" + s.Version().String())
if size > length {
length = size
}
}
return length
}
func colorPrint(act byte, err error, l int, ss ...Service) {
s := ss[0]
switch act {
case actOnStart:
if err == nil {
fmt.Fprintf(color.Output, "%s [start] - %s", util.SidStr(s.Name(), s.Version(), l), s)
} else {
fmt.Fprintf(color.Output, "%s [start] - run OnStart error: %s\n", util.SidStr(s.Name(), s.Version(), l), err)
}
case actOnStop:
if err == nil {
fmt.Fprintf(color.Output, "%s [stop] - finish...\n", util.SidStr(s.Name(), s.Version(), l))
} else {
fmt.Fprintf(color.Output, "%s [stop] - run OnStop error: %s\n", util.SidStr(s.Name(), s.Version(), l), err)
}
case actStart:
if err != nil {
for _, s = range ss {
fmt.Fprintf(color.Output, "%s [start] - starting server occurs error:: %s\n", util.SidStr(s.Name(), s.Version(), l), err)
}
}
case actStop:
if err != nil {
for _, s = range ss {
fmt.Fprintf(color.Output, "%s [stop] - stopping server occurs error:: %s\n", util.SidStr(s.Name(), s.Version(), l), err)
}
}
}
}
// Start start all servers
func Start(wg *sync.WaitGroup) {
srvSize, maxSidSize := checkServices()
if srvSize < 1 {
return
}
// some initialize for server engine
gin.SetMode(conf.RunMode())
// start servers
httpServers.startServer(wg, maxSidSize)
grpcServers.startServer(wg, maxSidSize)
}
// Stop stop all servers
func Stop() {
srvSize, maxSidSize := checkServices()
if srvSize < 1 {
return
}
// stop servers
httpServers.stopServer(maxSidSize)
grpcServers.stopServer(maxSidSize)
}

@ -34,8 +34,8 @@ func (baseService) String() string {
return ""
}
// InitService Initial service
func InitService() []Service {
// MustInitService Initial service
func MustInitService() []Service {
ss := newService()
for _, s := range ss {
if err := s.OnInit(); err != nil {
@ -45,18 +45,6 @@ func InitService() []Service {
return ss
}
// MaxSidSize max service id string length
func MaxSidSize(ss []Service) int {
length := 0
for _, s := range ss {
size := len(s.Name() + "@" + s.Version().String())
if size > length {
length = size
}
}
return length
}
func newService() (ss []Service) {
// add all service if declared in features on config.yaml
cfg.In(cfg.Actions{

@ -33,7 +33,7 @@ func (s *spaceXService) Version() *semver.Version {
}
func (s *spaceXService) OnInit() error {
s.registerRoute(servants.RegisterSpaceXServants)
s.registerRoute(s, servants.RegisterSpaceXServants)
return nil
}

@ -33,7 +33,7 @@ func (s *webService) Version() *semver.Version {
}
func (s *webService) OnInit() error {
s.registerRoute(servants.RegisterWebServants)
s.registerRoute(s, servants.RegisterWebServants)
return nil
}
@ -88,7 +88,6 @@ func newWebService() Service {
},
}
})
return &webService{
baseHttpService: &baseHttpService{
server: server,

@ -31,7 +31,7 @@ func (s *oldWebService) Version() *semver.Version {
}
func (s *oldWebService) OnInit() error {
s.registerRoute(routers.RegisterRoute)
s.registerRoute(s, routers.RegisterRoute)
return nil
}

@ -14,7 +14,6 @@ import (
"syscall"
"github.com/fatih/color"
"github.com/gin-gonic/gin"
"github.com/rocboss/paopao-ce/internal"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/service"
@ -53,23 +52,20 @@ func flagParse() {
flag.Parse()
}
func runService(wg *sync.WaitGroup, ss []service.Service) {
gin.SetMode(conf.RunMode())
fmt.Fprintf(color.Output, "\nstarting run service...\n\n")
l := service.MaxSidSize(ss)
for _, s := range ss {
go func(s service.Service) {
fmt.Fprintf(color.Output, "%s [start] - %s", util.SidStr(s.Name(), s.Version(), l), s)
if err := s.OnStart(); err != nil {
fmt.Fprintf(color.Output, "%s [start] - occurs on error: %s\n", util.SidStr(s.Name(), s.Version(), l), err)
}
wg.Done()
}(s)
}
func main() {
util.PrintHelloBanner(debug.VersionInfo())
ss := service.MustInitService()
if len(ss) < 1 {
fmt.Fprintln(color.Output, "no service need start so just exit")
return
}
func runManage(wg *sync.WaitGroup, ss []service.Service) {
wg := &sync.WaitGroup{}
// start services
fmt.Fprintf(color.Output, "\nstarting run service...\n\n")
service.Start(wg)
// graceful stop services
wg.Add(1)
go func() {
quit := make(chan os.Signal, 1)
// kill (no param) default send syscall.SIGTERM
// kill -2 is syscall.SIGINT
@ -77,28 +73,8 @@ func runManage(wg *sync.WaitGroup, ss []service.Service) {
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
fmt.Fprintf(color.Output, "\nshutting down server...\n\n")
l := service.MaxSidSize(ss)
for _, s := range ss {
if err := s.OnStop(); err != nil {
fmt.Fprintf(color.Output, "%s [stop] - occurs on error: %s\n", util.SidStr(s.Name(), s.Version(), l), err)
}
fmt.Fprintf(color.Output, "%s [stop] - finish...\n", util.SidStr(s.Name(), s.Version(), l))
}
service.Stop()
wg.Done()
}
func main() {
util.PrintHelloBanner(debug.VersionInfo())
if ss := service.InitService(); len(ss) > 0 {
wg := &sync.WaitGroup{}
wg.Add(len(ss) + 1)
runService(wg, ss)
go runManage(wg, ss)
}()
wg.Wait()
} else {
fmt.Fprintln(color.Output, "no service need start so just exit")
}
}

Loading…
Cancel
Save