add JobManager component in events package

x/helm
Michael Li 10 months ago
parent 0ad77cf17b
commit 2ae7d4d02a
No known key found for this signature in database

@ -30,6 +30,7 @@ require (
github.com/onsi/gomega v1.27.10
github.com/pyroscope-io/client v0.7.2
github.com/redis/rueidis v1.0.17
github.com/robfig/cron/v3 v3.0.1
github.com/sirupsen/logrus v1.9.3
github.com/smartwalle/alipay/v3 v3.2.16
github.com/sourcegraph/conc v0.3.0

@ -1125,6 +1125,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qq
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=

@ -7,55 +7,20 @@ package events
import (
"sync"
"github.com/alimy/tryst/event"
"github.com/alimy/tryst/pool"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/alimy/tryst/cfg"
"github.com/sirupsen/logrus"
)
var (
_defaultEventManager event.EventManager
_onceInitial sync.Once
_onceInitial sync.Once
)
// OnEvent push event to gorotine pool then handled automatic.
func OnEvent(event event.Event) {
_defaultEventManager.OnEvent(event)
}
func Initial() {
_onceInitial.Do(func() {
var opts []pool.Option
s := conf.EventManagerSetting
if s.MinWorker > 5 {
opts = append(opts, pool.MinWorkerOpt(s.MinWorker))
} else {
opts = append(opts, pool.MinWorkerOpt(5))
}
if s.MaxEventBuf > 10 {
opts = append(opts, pool.MaxRequestBufOpt(s.MaxEventBuf))
} else {
opts = append(opts, pool.MaxRequestBufOpt(10))
}
if s.MaxTempEventBuf > 10 {
opts = append(opts, pool.MaxRequestTempBufOpt(s.MaxTempEventBuf))
} else {
opts = append(opts, pool.MaxRequestTempBufOpt(10))
initEventManager()
if cfg.If("JobManager") {
initJobManager()
logrus.Debugln("initial JobManager")
}
opts = append(opts, pool.MaxTickCountOpt(s.MaxTickCount), pool.TickWaitTimeOpt(s.TickWaitTime))
_defaultEventManager = event.NewEventManager(func(req event.Event, err error) {
if err != nil {
logrus.Errorf("handle event[%s] occurs error: %s", req.Name(), err)
}
}, opts...)
})
}
func Restart() {
_defaultEventManager.Stop()
_defaultEventManager.Start()
}
func Done() {
_defaultEventManager.Stop()
}

@ -0,0 +1,110 @@
// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package events
import (
"github.com/robfig/cron/v3"
)
var (
_defaultJobManager JobManager = (*jobManager)(nil)
)
type (
EntryID = cron.EntryID
)
// JobFn job help function that implement cron.Job interface
type JobFn func()
func (fn JobFn) Run() {
fn()
}
// Job job interface
type Job interface {
cron.Schedule
cron.Job
}
type simpleJob struct {
cron.Schedule
cron.Job
}
// JobManager job manger interface
type JobManager interface {
Start()
Stop()
Remove(id EntryID)
Schedule(Job) EntryID
}
type jobManager struct {
m *cron.Cron
}
func (j *jobManager) Start() {
j.m.Start()
}
func (j *jobManager) Stop() {
j.m.Stop()
}
// Remove an entry from being run in the future.
func (j *jobManager) Remove(id EntryID) {
j.m.Remove(id)
}
// Schedule adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func (j *jobManager) Schedule(job Job) EntryID {
return j.m.Schedule(job, job)
}
func initJobManager() {
_defaultJobManager = &jobManager{
m: cron.New(),
}
StartJobManager()
}
func StartJobManager() {
_defaultJobManager.Start()
}
func StopJobManager() {
_defaultJobManager.Stop()
}
// NewJob create new Job instance
func NewJob(s cron.Schedule, fn JobFn) Job {
return &simpleJob{
Schedule: s,
Job: fn,
}
}
// RemoveJob an entry from being run in the future.
func RemoveJob(id EntryID) {
_defaultJobManager.Remove(id)
}
// ScheduleJob adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func ScheduleJob(job Job) EntryID {
return _defaultJobManager.Schedule(job)
}
// Schedule adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func Schedule(s cron.Schedule, fn JobFn) EntryID {
job := &simpleJob{
Schedule: s,
Job: fn,
}
return _defaultJobManager.Schedule(job)
}

@ -0,0 +1,55 @@
// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package events
import (
"github.com/alimy/tryst/event"
"github.com/alimy/tryst/pool"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/sirupsen/logrus"
)
var (
_defaultEventManager event.EventManager
)
func initEventManager() {
var opts []pool.Option
s := conf.EventManagerSetting
if s.MinWorker > 5 {
opts = append(opts, pool.MinWorkerOpt(s.MinWorker))
} else {
opts = append(opts, pool.MinWorkerOpt(5))
}
if s.MaxEventBuf > 10 {
opts = append(opts, pool.MaxRequestBufOpt(s.MaxEventBuf))
} else {
opts = append(opts, pool.MaxRequestBufOpt(10))
}
if s.MaxTempEventBuf > 10 {
opts = append(opts, pool.MaxRequestTempBufOpt(s.MaxTempEventBuf))
} else {
opts = append(opts, pool.MaxRequestTempBufOpt(10))
}
opts = append(opts, pool.MaxTickCountOpt(s.MaxTickCount), pool.TickWaitTimeOpt(s.TickWaitTime))
_defaultEventManager = event.NewEventManager(func(req event.Event, err error) {
if err != nil {
logrus.Errorf("handle event[%s] occurs error: %s", req.Name(), err)
}
}, opts...)
}
func StartEventManager() {
_defaultEventManager.Start()
}
func StopEventManager() {
_defaultEventManager.Stop()
}
// OnEvent push event to gorotine pool then handled automatic.
func OnEvent(event event.Event) {
_defaultEventManager.OnEvent(event)
}
Loading…
Cancel
Save