diff --git a/go.mod b/go.mod index 97d594a7..faa4132d 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/onsi/gomega v1.27.10 github.com/pyroscope-io/client v0.7.2 github.com/redis/rueidis v1.0.17 + github.com/robfig/cron/v3 v3.0.1 github.com/sirupsen/logrus v1.9.3 github.com/smartwalle/alipay/v3 v3.2.16 github.com/sourcegraph/conc v0.3.0 diff --git a/go.sum b/go.sum index 43f1eef7..eb5476fa 100644 --- a/go.sum +++ b/go.sum @@ -1125,6 +1125,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qq github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/internal/events/events.go b/internal/events/events.go index 9b5bb719..01b9c15c 100644 --- a/internal/events/events.go +++ b/internal/events/events.go @@ -7,55 +7,20 @@ package events import ( "sync" - "github.com/alimy/tryst/event" - "github.com/alimy/tryst/pool" - "github.com/rocboss/paopao-ce/internal/conf" + "github.com/alimy/tryst/cfg" "github.com/sirupsen/logrus" ) var ( - _defaultEventManager event.EventManager - _onceInitial sync.Once + _onceInitial sync.Once ) -// OnEvent push event to gorotine pool then handled automatic. -func OnEvent(event event.Event) { - _defaultEventManager.OnEvent(event) -} - func Initial() { _onceInitial.Do(func() { - var opts []pool.Option - s := conf.EventManagerSetting - if s.MinWorker > 5 { - opts = append(opts, pool.MinWorkerOpt(s.MinWorker)) - } else { - opts = append(opts, pool.MinWorkerOpt(5)) - } - if s.MaxEventBuf > 10 { - opts = append(opts, pool.MaxRequestBufOpt(s.MaxEventBuf)) - } else { - opts = append(opts, pool.MaxRequestBufOpt(10)) - } - if s.MaxTempEventBuf > 10 { - opts = append(opts, pool.MaxRequestTempBufOpt(s.MaxTempEventBuf)) - } else { - opts = append(opts, pool.MaxRequestTempBufOpt(10)) + initEventManager() + if cfg.If("JobManager") { + initJobManager() + logrus.Debugln("initial JobManager") } - opts = append(opts, pool.MaxTickCountOpt(s.MaxTickCount), pool.TickWaitTimeOpt(s.TickWaitTime)) - _defaultEventManager = event.NewEventManager(func(req event.Event, err error) { - if err != nil { - logrus.Errorf("handle event[%s] occurs error: %s", req.Name(), err) - } - }, opts...) }) } - -func Restart() { - _defaultEventManager.Stop() - _defaultEventManager.Start() -} - -func Done() { - _defaultEventManager.Stop() -} diff --git a/internal/events/jobs.go b/internal/events/jobs.go new file mode 100644 index 00000000..f3cc984a --- /dev/null +++ b/internal/events/jobs.go @@ -0,0 +1,110 @@ +// Copyright 2023 ROC. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package events + +import ( + "github.com/robfig/cron/v3" +) + +var ( + _defaultJobManager JobManager = (*jobManager)(nil) +) + +type ( + EntryID = cron.EntryID +) + +// JobFn job help function that implement cron.Job interface +type JobFn func() + +func (fn JobFn) Run() { + fn() +} + +// Job job interface +type Job interface { + cron.Schedule + cron.Job +} + +type simpleJob struct { + cron.Schedule + cron.Job +} + +// JobManager job manger interface +type JobManager interface { + Start() + Stop() + Remove(id EntryID) + Schedule(Job) EntryID +} + +type jobManager struct { + m *cron.Cron +} + +func (j *jobManager) Start() { + j.m.Start() +} + +func (j *jobManager) Stop() { + j.m.Stop() +} + +// Remove an entry from being run in the future. +func (j *jobManager) Remove(id EntryID) { + j.m.Remove(id) +} + +// Schedule adds a Job to the Cron to be run on the given schedule. +// The job is wrapped with the configured Chain. +func (j *jobManager) Schedule(job Job) EntryID { + return j.m.Schedule(job, job) +} + +func initJobManager() { + _defaultJobManager = &jobManager{ + m: cron.New(), + } + StartJobManager() +} + +func StartJobManager() { + _defaultJobManager.Start() +} + +func StopJobManager() { + _defaultJobManager.Stop() +} + +// NewJob create new Job instance +func NewJob(s cron.Schedule, fn JobFn) Job { + return &simpleJob{ + Schedule: s, + Job: fn, + } +} + +// RemoveJob an entry from being run in the future. +func RemoveJob(id EntryID) { + _defaultJobManager.Remove(id) +} + +// ScheduleJob adds a Job to the Cron to be run on the given schedule. +// The job is wrapped with the configured Chain. +func ScheduleJob(job Job) EntryID { + return _defaultJobManager.Schedule(job) +} + +// Schedule adds a Job to the Cron to be run on the given schedule. +// The job is wrapped with the configured Chain. +func Schedule(s cron.Schedule, fn JobFn) EntryID { + job := &simpleJob{ + Schedule: s, + Job: fn, + } + return _defaultJobManager.Schedule(job) +} diff --git a/internal/events/pool.go b/internal/events/pool.go new file mode 100644 index 00000000..f5170591 --- /dev/null +++ b/internal/events/pool.go @@ -0,0 +1,55 @@ +// Copyright 2023 ROC. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package events + +import ( + "github.com/alimy/tryst/event" + "github.com/alimy/tryst/pool" + "github.com/rocboss/paopao-ce/internal/conf" + "github.com/sirupsen/logrus" +) + +var ( + _defaultEventManager event.EventManager +) + +func initEventManager() { + var opts []pool.Option + s := conf.EventManagerSetting + if s.MinWorker > 5 { + opts = append(opts, pool.MinWorkerOpt(s.MinWorker)) + } else { + opts = append(opts, pool.MinWorkerOpt(5)) + } + if s.MaxEventBuf > 10 { + opts = append(opts, pool.MaxRequestBufOpt(s.MaxEventBuf)) + } else { + opts = append(opts, pool.MaxRequestBufOpt(10)) + } + if s.MaxTempEventBuf > 10 { + opts = append(opts, pool.MaxRequestTempBufOpt(s.MaxTempEventBuf)) + } else { + opts = append(opts, pool.MaxRequestTempBufOpt(10)) + } + opts = append(opts, pool.MaxTickCountOpt(s.MaxTickCount), pool.TickWaitTimeOpt(s.TickWaitTime)) + _defaultEventManager = event.NewEventManager(func(req event.Event, err error) { + if err != nil { + logrus.Errorf("handle event[%s] occurs error: %s", req.Name(), err) + } + }, opts...) +} + +func StartEventManager() { + _defaultEventManager.Start() +} + +func StopEventManager() { + _defaultEventManager.Stop() +} + +// OnEvent push event to gorotine pool then handled automatic. +func OnEvent(event event.Event) { + _defaultEventManager.OnEvent(event) +}