add events manager sub-system

x/helm
Michael Li 10 months ago
parent da8263a3ed
commit ed909ba497
No known key found for this signature in database

@ -6,7 +6,7 @@ require (
github.com/Masterminds/semver/v3 v3.2.1
github.com/afocus/captcha v0.0.0-20191010092841-4bd1f21c8868
github.com/alimy/mir/v4 v4.0.0
github.com/alimy/tryst v0.4.0
github.com/alimy/tryst v0.6.0
github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible
github.com/allegro/bigcache/v3 v3.1.0
github.com/bufbuild/connect-go v1.10.0

@ -125,8 +125,8 @@ github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:C
github.com/alexflint/go-filemutex v1.1.0/go.mod h1:7P4iRhttt/nUvUOrYIhcpMzv2G6CY9UnI16Z+UJqRyk=
github.com/alimy/mir/v4 v4.0.0 h1:MzGfmoLjjvR69jbZEmpKJO3tUuqB0RGRv1UWPbtukBg=
github.com/alimy/mir/v4 v4.0.0/go.mod h1:d58dBvw2KImcVbAUANrciEV/of0arMNsI9c/5UNCMMc=
github.com/alimy/tryst v0.4.0 h1:OjidWhVWxKonoFzJ1oIZ5+gikDSaJaKzqHGP9BxMQ5o=
github.com/alimy/tryst v0.4.0/go.mod h1:K//dPeoE/nnv2Jw8C3iPE7n8mO6LVqAxVmqbopM9nAk=
github.com/alimy/tryst v0.6.0 h1:wbPAGpkNXVPWtJG30g5RERF/lnYHkM7BM8Jsw8HJTrc=
github.com/alimy/tryst v0.6.0/go.mod h1:K//dPeoE/nnv2Jw8C3iPE7n8mO6LVqAxVmqbopM9nAk=
github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible h1:Sg/2xHwDrioHpxTN6WMiwbXTpUEinBpHsN7mG21Rc2k=
github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/allegro/bigcache/v3 v3.1.0 h1:H2Vp8VOvxcrB91o86fUSVJFqeuz8kpyyB02eH3bSzwk=

@ -35,6 +35,7 @@ var (
DocsServerSetting *httpServerConf
MobileServerSetting *grpcServerConf
AppSetting *appConf
EventManagerSetting *eventManagerConf
CacheIndexSetting *cacheIndexConf
SimpleCacheIndexSetting *simpleCacheIndexConf
BigCacheIndexSetting *bigCacheIndexConf
@ -69,6 +70,7 @@ func setupSetting(suite []string, noDefault bool) error {
objects := map[string]any{
"App": &AppSetting,
"EventManager": &EventManagerSetting,
"PprofServer": &PprofServerSetting,
"WebServer": &WebServerSetting,
"AdminServer": &AdminServerSetting,
@ -115,6 +117,7 @@ func setupSetting(suite []string, noDefault bool) error {
}
}
EventManagerSetting.TickWaitTime *= time.Second
JWTSetting.Expire *= time.Second
SimpleCacheIndexSetting.CheckTickDuration *= time.Second
SimpleCacheIndexSetting.ExpireTickDuration *= time.Second

@ -5,6 +5,12 @@ App: # APP基础设置项
DefaultContextTimeout: 60
DefaultPageSize: 10
MaxPageSize: 100
EventManager: # 事件管理器的配置参数
MinWorker: 10 # 最小后台工作者, 设置范围[5, ++], 默认10
MaxEventBuf: 100 # 最大log缓存条数, 设置范围[10, ++], 默认100
MaxTempEventBuf: 100 # 最大log缓存条数, 设置范围[10, ++], 默认100
MaxTickCount: 60 # 最大的循环周期, 设置范围[60, ++], 默认60
TickWaitTime: 1 # 一个周期的等待时间,单位:秒 默认1s
Features:
Default: []
WebServer: # Web服务

@ -96,6 +96,14 @@ type appConf struct {
MaxPageSize int
}
type eventManagerConf struct {
MinWorker int
MaxEventBuf int
MaxTempEventBuf int
MaxTickCount int
TickWaitTime time.Duration
}
type cacheIndexConf struct {
MaxUpdateQPS int
MinWorker int

@ -0,0 +1,52 @@
// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package events
import (
"github.com/alimy/tryst/event"
"github.com/alimy/tryst/pool"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/sirupsen/logrus"
)
var _defaultEventManager event.EventManager
// OnEvent push event to gorotine pool then handled automatic.
func OnEvent(event event.Event) {
_defaultEventManager.OnEvent(event)
}
func Initial() {
var opts []pool.Option
s := conf.EventManagerSetting
if s.MinWorker > 5 {
opts = append(opts, pool.MinWorkerOpt(s.MinWorker))
} else {
opts = append(opts, pool.MinWorkerOpt(5))
}
if s.MaxEventBuf > 10 {
opts = append(opts, pool.MaxRequestBufOpt(s.MaxEventBuf))
} else {
opts = append(opts, pool.MaxRequestBufOpt(10))
}
if s.MaxTempEventBuf > 10 {
opts = append(opts, pool.MaxRequestTempBufOpt(s.MaxTempEventBuf))
} else {
opts = append(opts, pool.MaxRequestTempBufOpt(10))
}
opts = append(opts, pool.MaxTickCountOpt(s.MaxTickCount), pool.TickWaitTimeOpt(s.TickWaitTime))
_defaultEventManager = event.NewEventManager(func(req event.Event, err error) {
logrus.Errorf("handle event[%s] occurs error: %s", req.Name(), err)
}, opts...)
}
func Restart() {
_defaultEventManager.Stop()
_defaultEventManager.Start()
}
func Done() {
_defaultEventManager.Stop()
}

@ -5,10 +5,13 @@
package internal
import (
"github.com/rocboss/paopao-ce/internal/events"
"github.com/rocboss/paopao-ce/internal/migration"
)
func Initial() {
// migrate database if needed
migration.Run()
// event manager system initialize
events.Initial()
}

@ -21,9 +21,9 @@ import (
"github.com/rocboss/paopao-ce/internal/core/ms"
"github.com/rocboss/paopao-ce/internal/dao"
"github.com/rocboss/paopao-ce/internal/dao/cache"
"github.com/rocboss/paopao-ce/internal/events"
"github.com/rocboss/paopao-ce/pkg/app"
"github.com/rocboss/paopao-ce/pkg/xerror"
"github.com/sirupsen/logrus"
)
type BaseServant struct {
@ -203,9 +203,16 @@ func (s *DaoServant) GetTweetBy(id int64) (*ms.PostFormated, error) {
return postFormated, nil
}
func (s *DaoServant) PushPostsToSearch(c context.Context) {
if err := s.Redis.SetPushToSearchJob(c); err == nil {
defer s.Redis.DelPushToSearchJob(c)
func (s *DaoServant) PushAllPostToSearch() {
events.OnEvent(&pushAllPostToSearchEvent{
fn: s.pushAllPostToSearch,
})
}
func (s *DaoServant) pushAllPostToSearch() error {
ctx := context.Background()
if err := s.Redis.SetPushToSearchJob(ctx); err == nil {
defer s.Redis.DelPushToSearchJob(ctx)
splitNum := 1000
conditions := ms.ConditionsT{
@ -234,11 +241,19 @@ func (s *DaoServant) PushPostsToSearch(c context.Context) {
}
}
} else {
logrus.Errorf("redis: set JOB_PUSH_TO_SEARCH error: %s", err)
return fmt.Errorf("redis: set JOB_PUSH_TO_SEARCH error: %w", err)
}
return nil
}
func (s *DaoServant) PushPostToSearch(post *ms.Post) {
events.OnEvent(&pushPostToSearchEvent{
fn: s.pushPostToSearch,
post: post,
})
}
func (s *DaoServant) pushPostToSearch(post *ms.Post) {
postFormated := post.Format()
postFormated.User = &ms.UserFormated{
ID: post.UserID,
@ -247,14 +262,12 @@ func (s *DaoServant) PushPostToSearch(post *ms.Post) {
for _, content := range contents {
postFormated.Contents = append(postFormated.Contents, content.Format())
}
contentFormated := ""
for _, content := range postFormated.Contents {
if content.Type == ms.ContentTypeText || content.Type == ms.ContentTypeTitle {
contentFormated = contentFormated + content.Content + "\n"
}
}
docs := []core.TsDocItem{{
Post: post,
Content: contentFormated,

@ -0,0 +1,38 @@
// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package base
import (
"github.com/alimy/tryst/event"
"github.com/rocboss/paopao-ce/internal/core/ms"
)
type pushPostToSearchEvent struct {
event.UnimplementedEvent
fn func(*ms.Post)
post *ms.Post
}
type pushAllPostToSearchEvent struct {
event.UnimplementedEvent
fn func() error
}
func (p *pushPostToSearchEvent) Name() string {
return "servants.base.pushPostToSearchEvent"
}
func (p *pushPostToSearchEvent) Action() (err error) {
p.fn(p.post)
return
}
func (p *pushAllPostToSearchEvent) Name() string {
return "servants.base.pushAllPostToSearchEvent"
}
func (p *pushAllPostToSearchEvent) Action() error {
return p.fn()
}

@ -45,7 +45,7 @@ func (s *coreSrv) Chain() gin.HandlersChain {
func (s *coreSrv) SyncSearchIndex(req *web.SyncSearchIndexReq) mir.Error {
if req.User != nil && req.User.IsAdmin {
go s.PushPostsToSearch(context.Background())
s.PushAllPostToSearch()
} else {
logrus.Warnf("sync search index need admin permision user: %#v", req.User)
}

Loading…
Cancel
Save