From abfee0621fcb6a195ec92a6373841365ba9d63af Mon Sep 17 00:00:00 2001 From: alimy Date: Fri, 10 Jun 2022 22:32:43 +0800 Subject: [PATCH 1/5] optimize add cache first number page of post in custom configure --- config.yaml.sample | 14 ++++- internal/conf/conf.go | 24 ++++---- internal/conf/logger.go | 6 ++ internal/conf/settting.go | 14 ++++- internal/core/core.go | 1 + internal/dao/dao.go | 39 +++++++++++- internal/dao/post.go | 25 ++++++-- internal/dao/post_action.go | 28 +++++++++ internal/dao/post_index.go | 112 +++++++++++++++++++++++++++++++++++ internal/routers/api/post.go | 16 ++--- internal/service/post.go | 4 ++ pkg/app/pagination.go | 17 ++++-- 12 files changed, 264 insertions(+), 36 deletions(-) create mode 100644 internal/dao/post_action.go create mode 100644 internal/dao/post_index.go diff --git a/config.yaml.sample b/config.yaml.sample index cd0c55d1..5aa7eb49 100644 --- a/config.yaml.sample +++ b/config.yaml.sample @@ -12,9 +12,11 @@ Server: # 服务设置 ReadTimeout: 60 WriteTimeout: 60 Features: - Default: ["Alipay", "Zinc", "MySQL", "Redis", "MinIO", "LoggerFile"] - Develop: ["Sms", "Zinc", "MySQL", "AliOSS", "LoggerZinc"] - Slim: ["Zinc", "MySQL", "Redis", "LocalOSS", "LoggerFile"] + Default: ["Base", "Option", "MinIO", "LoggerFile"] + Develop: ["Base", "Option", "Sms", "AliOSS", "LoggerZinc"] + Slim: ["Base", "LocalOSS", "LoggerFile"] + Base: ["Zinc", "MySQL", "Redis", "Alipay",] + Option: ["CacheIndex"] Sms: "SmsJuhe" SmsJuhe: Key: @@ -23,15 +25,21 @@ SmsJuhe: Alipay: AppID: PrivateKey: +CacheIndex: # 缓存泡泡广场消息流 + MaxIndexSize: 200 # 最大缓存条数 + CheckTickDuration: 60 # 循环自检查每多少秒一次 + ExpireTickDuration: 300 # 每多少秒后强制过期缓存 LoggerFile: # 使用File写日志 SavePath: data/paopao-ce/logs FileName: app FileExt: .log + Level: info LoggerZinc: # 使用Zinc写日志 Host: http://zinc:4080/es/_bulk Index: paopao-log User: admin Password: admin + Level: info JWT: # 鉴权加密 Secret: 18a6413dc4fe394c66345ebe501b2f26 Issuer: paopao-api diff --git a/internal/conf/conf.go b/internal/conf/conf.go index 9e54807a..4ba67b07 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -13,17 +13,18 @@ var ( redisSetting *RedisSettingS features *FeaturesSettingS - ServerSetting *ServerSettingS - AppSetting *AppSettingS - SmsJuheSetting *SmsJuheSettings - AlipaySetting *AlipaySettingS - ZincSetting *ZincSettingS - AliOSSSetting *AliOSSSettingS - MinIOSetting *MinIOSettingS - S3Setting *S3SettingS - LocalOSSSetting *LocalOSSSettingS - JWTSetting *JWTSettingS - Mutex *sync.Mutex + ServerSetting *ServerSettingS + AppSetting *AppSettingS + CacheIndexSetting *CacheIndexSettingS + SmsJuheSetting *SmsJuheSettings + AlipaySetting *AlipaySettingS + ZincSetting *ZincSettingS + AliOSSSetting *AliOSSSettingS + MinIOSetting *MinIOSettingS + S3Setting *S3SettingS + LocalOSSSetting *LocalOSSSettingS + JWTSetting *JWTSettingS + Mutex *sync.Mutex ) func setupSetting(suite []string, noDefault bool) error { @@ -42,6 +43,7 @@ func setupSetting(suite []string, noDefault bool) error { objects := map[string]interface{}{ "App": &AppSetting, "Server": &ServerSetting, + "CacheIndex": &CacheIndexSetting, "Alipay": &AlipaySetting, "SmsJuhe": &SmsJuheSetting, "LoggerFile": &loggerFileSetting, diff --git a/internal/conf/logger.go b/internal/conf/logger.go index e7f183fe..31724a6d 100644 --- a/internal/conf/logger.go +++ b/internal/conf/logger.go @@ -74,6 +74,9 @@ func setupLogger() { LocalTime: true, } logrus.SetOutput(out) + if level, err := logrus.ParseLevel(loggerFileSetting.Level); err == nil { + logrus.SetLevel(level) + } } else if CfgIf("LoggerZinc") { hook := &zincLogHook{ host: loggerZincSetting.Host, @@ -81,6 +84,9 @@ func setupLogger() { user: loggerZincSetting.User, password: loggerZincSetting.Password, } + if level, err := logrus.ParseLevel(loggerZincSetting.Level); err == nil { + logrus.SetLevel(level) + } logrus.SetOutput(io.Discard) logrus.AddHook(hook) } diff --git a/internal/conf/settting.go b/internal/conf/settting.go index 8b6f8fbc..7a8f7e36 100644 --- a/internal/conf/settting.go +++ b/internal/conf/settting.go @@ -16,6 +16,7 @@ type LoggerFileSettingS struct { SavePath string FileName string FileExt string + Level string } type LoggerZincSettingS struct { @@ -23,6 +24,7 @@ type LoggerZincSettingS struct { Index string User string Password string + Level string } type ServerSettingS struct { @@ -44,6 +46,12 @@ type AppSettingS struct { TronApiKeys []string } +type CacheIndexSettingS struct { + MaxIndexSize int + CheckTickDuration int + ExpireTickDuration int +} + type AlipaySettingS struct { AppID string PrivateKey string @@ -197,7 +205,6 @@ func (f *FeaturesSettingS) Use(suite []string, noDefault bool) error { } features := f.flatFeatures(suite) for _, feature := range features { - feature = strings.ToLower(feature) if len(feature) == 0 { continue } @@ -209,10 +216,11 @@ func (f *FeaturesSettingS) Use(suite []string, noDefault bool) error { func (f *FeaturesSettingS) flatFeatures(suite []string) []string { features := make([]string, 0, len(suite)+10) for s := suite[:]; len(s) > 0; s = s[:len(s)-1] { - if items, exist := f.suites[s[0]]; exist { + item := strings.ToLower(s[0]) + if items, exist := f.suites[item]; exist { s = append(s, items...) } - features = append(features, s[0]) + features = append(features, item) s[0] = s[len(s)-1] } return features diff --git a/internal/core/core.go b/internal/core/core.go index 7a0b426f..cdd1a92b 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -34,6 +34,7 @@ type DataService interface { LockPost(post *model.Post) error StickPost(post *model.Post) error GetPostByID(id int64) (*model.Post, error) + IndexPosts(offset int, limit int) ([]*model.PostFormated, error) GetPosts(conditions *model.ConditionsT, offset, limit int) ([]*model.Post, error) GetPostCount(conditions *model.ConditionsT) (int64, error) UpdatePost(post *model.Post) error diff --git a/internal/dao/dao.go b/internal/dao/dao.go index e2ead4d7..96d6972c 100644 --- a/internal/dao/dao.go +++ b/internal/dao/dao.go @@ -1,10 +1,14 @@ package dao import ( + "sync/atomic" + "time" + "github.com/aliyun/aliyun-oss-go-sdk/oss" "github.com/minio/minio-go/v7" "github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/core" + "github.com/rocboss/paopao-ce/internal/model" "github.com/rocboss/paopao-ce/pkg/zinc" "github.com/sirupsen/logrus" "gorm.io/gorm" @@ -22,6 +26,14 @@ var ( type dataServant struct { engine *gorm.DB zinc *zinc.ZincClient + + useCacheIndex bool + indexActionCh chan indexActionT + indexPosts []*model.PostFormated + atomicIndex atomic.Value + maxIndexSize int + checkTick *time.Ticker + expireIndexTick *time.Ticker } type localossServant struct { @@ -47,10 +59,31 @@ type attachmentCheckServant struct { } func NewDataService(engine *gorm.DB, zinc *zinc.ZincClient) core.DataService { - return &dataServant{ - engine: engine, - zinc: zinc, + if !conf.CfgIf("CacheIndex") { + return &dataServant{ + engine: engine, + zinc: zinc, + useCacheIndex: false, + } + } + + s := conf.CacheIndexSetting + ds := &dataServant{ + engine: engine, + zinc: zinc, + useCacheIndex: true, + maxIndexSize: conf.CacheIndexSetting.MaxIndexSize, + indexPosts: make([]*model.PostFormated, 0), + indexActionCh: make(chan indexActionT, 100), // optimize: size need configure by custom + checkTick: time.NewTicker(time.Duration(s.CheckTickDuration) * time.Second), // check whether need update index every 1 minute + expireIndexTick: time.NewTicker(time.Duration(s.ExpireTickDuration) * time.Second), // force expire index every 5 minute } + + // start index posts + ds.atomicIndex.Store(ds.indexPosts) + go ds.startIndexPosts() + + return ds } func NewObjectStorageService() (oss core.ObjectStorageService) { diff --git a/internal/dao/post.go b/internal/dao/post.go index 9773a61c..8e9ba8ed 100644 --- a/internal/dao/post.go +++ b/internal/dao/post.go @@ -8,11 +8,20 @@ import ( func (d *dataServant) CreatePost(post *model.Post) (*model.Post, error) { post.LatestRepliedOn = time.Now().Unix() - return post.Create(d.engine) + p, err := post.Create(d.engine) + if err != nil { + return nil, err + } + d.indexActive(idxActCreatePost) + return p, nil } func (d *dataServant) DeletePost(post *model.Post) error { - return post.Delete(d.engine) + if err := post.Delete(d.engine); err != nil { + return err + } + d.indexActive(idxActDeletePost) + return nil } func (d *dataServant) LockPost(post *model.Post) error { @@ -22,7 +31,11 @@ func (d *dataServant) LockPost(post *model.Post) error { func (d *dataServant) StickPost(post *model.Post) error { post.IsTop = 1 - post.IsTop - return post.Update(d.engine) + if err := post.Update(d.engine); err != nil { + return err + } + d.indexActive(idxActStickPost) + return nil } func (d *dataServant) GetPostByID(id int64) (*model.Post, error) { @@ -43,7 +56,11 @@ func (d *dataServant) GetPostCount(conditions *model.ConditionsT) (int64, error) } func (d *dataServant) UpdatePost(post *model.Post) error { - return post.Update(d.engine) + if err := post.Update(d.engine); err != nil { + return err + } + d.indexActive(idxActUpdatePost) + return nil } func (d *dataServant) GetUserPostStar(postID, userID int64) (*model.PostStar, error) { diff --git a/internal/dao/post_action.go b/internal/dao/post_action.go new file mode 100644 index 00000000..408ef6ce --- /dev/null +++ b/internal/dao/post_action.go @@ -0,0 +1,28 @@ +package dao + +const ( + idxActNop indexActionT = iota + 1 + idxActCreatePost + idxActUpdatePost + idxActDeletePost + idxActStickPost +) + +type indexActionT uint8 + +func (a indexActionT) String() string { + switch a { + case idxActNop: + return "no operator" + case idxActCreatePost: + return "create post" + case idxActUpdatePost: + return "update post" + case idxActDeletePost: + return "delete post" + case idxActStickPost: + return "stick post" + default: + return "unknow action" + } +} diff --git a/internal/dao/post_index.go b/internal/dao/post_index.go new file mode 100644 index 00000000..1072da27 --- /dev/null +++ b/internal/dao/post_index.go @@ -0,0 +1,112 @@ +package dao + +import ( + "github.com/rocboss/paopao-ce/internal/model" + "github.com/sirupsen/logrus" +) + +func (d *dataServant) IndexPosts(offset int, limit int) ([]*model.PostFormated, error) { + if d.useCacheIndex { + posts := d.atomicIndex.Load().([]*model.PostFormated) + start := offset * limit + end := start + limit + if len(posts) >= end { + logrus.Debugln("get index posts from cached") + return posts[start:end], nil + } + } + logrus.Debugf("get index posts from database but useCacheIndex: %t", d.useCacheIndex) + return d.getIndexPosts(offset, limit) +} + +func (d *dataServant) MergePosts(posts []*model.Post) ([]*model.PostFormated, error) { + postIds := make([]int64, 0, len(posts)) + userIds := make([]int64, 0, len(posts)) + for _, post := range posts { + postIds = append(postIds, post.ID) + userIds = append(userIds, post.UserID) + } + + postContents, err := d.GetPostContentsByIDs(postIds) + if err != nil { + return nil, err + } + + users, err := d.GetUsersByIDs(userIds) + if err != nil { + return nil, err + } + + userMap := make(map[int64]*model.UserFormated, len(users)) + for _, user := range users { + userMap[user.ID] = user.Format() + } + + contentMap := make(map[int64][]*model.PostContentFormated, len(postContents)) + for _, content := range postContents { + contentMap[content.PostID] = append(contentMap[content.PostID], content.Format()) + } + + // 数据整合 + postsFormated := make([]*model.PostFormated, 0, len(posts)) + for _, post := range posts { + postFormated := post.Format() + postFormated.User = userMap[post.UserID] + postFormated.Contents = contentMap[post.ID] + postsFormated = append(postsFormated, postFormated) + } + return postsFormated, nil +} + +func (d *dataServant) getIndexPosts(offset int, limit int) ([]*model.PostFormated, error) { + posts, err := (&model.Post{}).List(d.engine, &model.ConditionsT{ + "ORDER": "is_top DESC, latest_replied_on DESC", + }, offset, limit) + if err != nil { + logrus.Debugf("getIndexPosts err: %v", err) + return nil, err + } + return d.MergePosts(posts) +} + +func (d *dataServant) indexActive(act indexActionT) { + select { + case d.indexActionCh <- act: + logrus.Debugf("send indexAction by chan: %s", act) + default: + go func(ch chan<- indexActionT, act indexActionT) { + logrus.Debugf("send indexAction by goroutine: %s", act) + ch <- act + }(d.indexActionCh, act) + } +} + +func (d *dataServant) startIndexPosts() { + var err error + for { + select { + case <-d.checkTick.C: + if len(d.indexPosts) == 0 { + logrus.Debugf("index posts by checkTick") + if d.indexPosts, err = d.getIndexPosts(0, d.maxIndexSize); err == nil { + d.atomicIndex.Store(d.indexPosts) + } else { + logrus.Errorf("get index posts err: %v", err) + } + } + case <-d.expireIndexTick.C: + logrus.Debugf("expire index posts by expireIndexTick") + d.indexPosts = nil + d.atomicIndex.Store(d.indexPosts) + case action := <-d.indexActionCh: + switch action { + case idxActCreatePost, idxActUpdatePost, idxActDeletePost, idxActStickPost: + logrus.Debugf("remove index posts by action %s", action) + d.indexPosts = nil + d.atomicIndex.Store(d.indexPosts) + default: + // nop + } + } + } +} diff --git a/internal/routers/api/post.go b/internal/routers/api/post.go index b595a482..942a130d 100644 --- a/internal/routers/api/post.go +++ b/internal/routers/api/post.go @@ -24,13 +24,15 @@ func GetPostList(c *gin.Context) { if q.Query == "" && q.Type == "search" { // 直接读库 - posts, err := service.GetPostList(&service.PostListReq{ - Conditions: &model.ConditionsT{ - "ORDER": "is_top DESC, latest_replied_on DESC", - }, - Offset: (app.GetPage(c) - 1) * app.GetPageSize(c), - Limit: app.GetPageSize(c), - }) + // posts, err := service.GetPostList(&service.PostListReq{ + // Conditions: &model.ConditionsT{ + // "ORDER": "is_top DESC, latest_replied_on DESC", + // }, + // Offset: (app.GetPage(c) - 1) * app.GetPageSize(c), + // Limit: app.GetPageSize(c), + // }) + offset, limit := app.GetPageOffset(c) + posts, err := service.GetIndexPosts(offset, limit) if err != nil { logrus.Errorf("service.GetPostList err: %v\n", err) response.ToErrorResponse(errcode.GetPostsFailed) diff --git a/internal/service/post.go b/internal/service/post.go index f692fb88..00bc494c 100644 --- a/internal/service/post.go +++ b/internal/service/post.go @@ -324,6 +324,10 @@ func GetPostContentByID(id int64) (*model.PostContent, error) { return ds.GetPostContentByID(id) } +func GetIndexPosts(offset int, limit int) ([]*model.PostFormated, error) { + return ds.IndexPosts(offset, limit) +} + func GetPostList(req *PostListReq) ([]*model.PostFormated, error) { posts, err := ds.GetPosts(req.Conditions, req.Offset, req.Limit) diff --git a/pkg/app/pagination.go b/pkg/app/pagination.go index 332e25c6..1888efb9 100644 --- a/pkg/app/pagination.go +++ b/pkg/app/pagination.go @@ -27,11 +27,18 @@ func GetPageSize(c *gin.Context) int { return pageSize } -func GetPageOffset(page, pageSize int) int { - result := 0 - if page > 0 { - result = (page - 1) * pageSize +func GetPageOffset(c *gin.Context) (offset, limit int) { + page := convert.StrTo(c.Query("page")).MustInt() + if page <= 0 { + page = 1 } - return result + limit = convert.StrTo(c.Query("page_size")).MustInt() + if limit <= 0 { + limit = conf.AppSetting.DefaultPageSize + } else if limit > conf.AppSetting.MaxPageSize { + limit = conf.AppSetting.MaxPageSize + } + offset = (page - 1) * limit + return } From 88e1b6f417a30bf087432faf79693cfb15745518 Mon Sep 17 00:00:00 2001 From: alimy Date: Fri, 10 Jun 2022 23:52:04 +0800 Subject: [PATCH 2/5] optimize #78 prevent many update pos in least time to expire index post cache --- internal/dao/post_index.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/internal/dao/post_index.go b/internal/dao/post_index.go index 1072da27..430477eb 100644 --- a/internal/dao/post_index.go +++ b/internal/dao/post_index.go @@ -96,14 +96,19 @@ func (d *dataServant) startIndexPosts() { } case <-d.expireIndexTick.C: logrus.Debugf("expire index posts by expireIndexTick") - d.indexPosts = nil - d.atomicIndex.Store(d.indexPosts) + if len(d.indexPosts) != 0 { + d.indexPosts = nil + d.atomicIndex.Store(d.indexPosts) + } case action := <-d.indexActionCh: switch action { case idxActCreatePost, idxActUpdatePost, idxActDeletePost, idxActStickPost: logrus.Debugf("remove index posts by action %s", action) - d.indexPosts = nil - d.atomicIndex.Store(d.indexPosts) + // prevent many update post in least time + if len(d.indexPosts) != 0 { + d.indexPosts = nil + d.atomicIndex.Store(d.indexPosts) + } default: // nop } From 962855eb11b66a8704eb625954e28bd771705112 Mon Sep 17 00:00:00 2001 From: alimy Date: Fri, 10 Jun 2022 23:54:01 +0800 Subject: [PATCH 3/5] optimize #78 debug info ajust --- internal/dao/post_index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/dao/post_index.go b/internal/dao/post_index.go index 430477eb..56a8c63f 100644 --- a/internal/dao/post_index.go +++ b/internal/dao/post_index.go @@ -103,9 +103,9 @@ func (d *dataServant) startIndexPosts() { case action := <-d.indexActionCh: switch action { case idxActCreatePost, idxActUpdatePost, idxActDeletePost, idxActStickPost: - logrus.Debugf("remove index posts by action %s", action) // prevent many update post in least time if len(d.indexPosts) != 0 { + logrus.Debugf("remove index posts by action %s", action) d.indexPosts = nil d.atomicIndex.Store(d.indexPosts) } From c215566edeec5749231ca18db29a999977da35e7 Mon Sep 17 00:00:00 2001 From: alimy Date: Sat, 11 Jun 2022 00:04:29 +0800 Subject: [PATCH 4/5] optimize features process logic --- internal/conf/settting.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/internal/conf/settting.go b/internal/conf/settting.go index 7a8f7e36..3c572b30 100644 --- a/internal/conf/settting.go +++ b/internal/conf/settting.go @@ -216,11 +216,13 @@ func (f *FeaturesSettingS) Use(suite []string, noDefault bool) error { func (f *FeaturesSettingS) flatFeatures(suite []string) []string { features := make([]string, 0, len(suite)+10) for s := suite[:]; len(s) > 0; s = s[:len(s)-1] { - item := strings.ToLower(s[0]) - if items, exist := f.suites[item]; exist { - s = append(s, items...) + item := strings.TrimSpace(strings.ToLower(s[0])) + if len(item) > 0 { + if items, exist := f.suites[item]; exist { + s = append(s, items...) + } + features = append(features, item) } - features = append(features, item) s[0] = s[len(s)-1] } return features From 3c3071791ecf6711e45337474f21ac877c880410 Mon Sep 17 00:00:00 2001 From: alimy Date: Sat, 11 Jun 2022 07:45:43 +0800 Subject: [PATCH 5/5] optimize #78 add abstract cache index servie --- internal/core/cache.go | 38 ++++++++++++++-- internal/dao/cache_index.go | 86 ++++++++++++++++++++++++++++++++++++ internal/dao/dao.go | 39 +++++++--------- internal/dao/post.go | 9 ++-- internal/dao/post_action.go | 28 ------------ internal/dao/post_index.go | 55 +++-------------------- internal/routers/api/post.go | 8 ---- 7 files changed, 149 insertions(+), 114 deletions(-) create mode 100644 internal/dao/cache_index.go delete mode 100644 internal/dao/post_action.go diff --git a/internal/core/cache.go b/internal/core/cache.go index a092f182..25394957 100644 --- a/internal/core/cache.go +++ b/internal/core/cache.go @@ -1,6 +1,38 @@ package core -// CacheService cache service interface that implement base Redis or other -type CacheService interface { - // TODO +import ( + "github.com/rocboss/paopao-ce/internal/model" +) + +const ( + IdxActNop IndexActionT = iota + 1 + IdxActCreatePost + IdxActUpdatePost + IdxActDeletePost + IdxActStickPost +) + +type IndexActionT uint8 + +func (a IndexActionT) String() string { + switch a { + case IdxActNop: + return "no operator" + case IdxActCreatePost: + return "create post" + case IdxActUpdatePost: + return "update post" + case IdxActDeletePost: + return "delete post" + case IdxActStickPost: + return "stick post" + default: + return "unknow action" + } +} + +// CacheIndexService cache index service interface +type CacheIndexService interface { + IndexPosts(offset int, limit int) ([]*model.PostFormated, bool) + SendAction(active IndexActionT) } diff --git a/internal/dao/cache_index.go b/internal/dao/cache_index.go new file mode 100644 index 00000000..c457987d --- /dev/null +++ b/internal/dao/cache_index.go @@ -0,0 +1,86 @@ +package dao + +import ( + "time" + + "github.com/rocboss/paopao-ce/internal/conf" + "github.com/rocboss/paopao-ce/internal/core" + "github.com/rocboss/paopao-ce/internal/model" + "github.com/sirupsen/logrus" +) + +func newSimpleCacheIndexServant(getIndexPosts func(offset, limit int) ([]*model.PostFormated, error)) *simpleCacheIndexServant { + s := conf.CacheIndexSetting + cacheIndex := &simpleCacheIndexServant{ + getIndexPosts: getIndexPosts, + maxIndexSize: s.MaxIndexSize, + indexPosts: make([]*model.PostFormated, 0), + indexActionCh: make(chan core.IndexActionT, 100), // optimize: size need configure by custom + checkTick: time.NewTicker(time.Duration(s.CheckTickDuration) * time.Second), // check whether need update index every 1 minute + expireIndexTick: time.NewTicker(time.Duration(s.ExpireTickDuration) * time.Second), // force expire index every 5 minute + } + + // start index posts + cacheIndex.atomicIndex.Store(cacheIndex.indexPosts) + go cacheIndex.startIndexPosts() + + return cacheIndex +} + +func (s *simpleCacheIndexServant) IndexPosts(offset int, limit int) ([]*model.PostFormated, bool) { + posts := s.atomicIndex.Load().([]*model.PostFormated) + start := offset * limit + end := start + limit + if len(posts) >= end { + logrus.Debugln("get index posts from cached") + return posts[start:end], true + } + return nil, false +} + +func (s *simpleCacheIndexServant) SendAction(act core.IndexActionT) { + select { + case s.indexActionCh <- act: + logrus.Debugf("send indexAction by chan: %s", act) + default: + go func(ch chan<- core.IndexActionT, act core.IndexActionT) { + logrus.Debugf("send indexAction by goroutine: %s", act) + ch <- act + }(s.indexActionCh, act) + } +} + +func (s *simpleCacheIndexServant) startIndexPosts() { + var err error + for { + select { + case <-s.checkTick.C: + if len(s.indexPosts) == 0 { + logrus.Debugf("index posts by checkTick") + if s.indexPosts, err = s.getIndexPosts(0, s.maxIndexSize); err == nil { + s.atomicIndex.Store(s.indexPosts) + } else { + logrus.Errorf("get index posts err: %v", err) + } + } + case <-s.expireIndexTick.C: + logrus.Debugf("expire index posts by expireIndexTick") + if len(s.indexPosts) != 0 { + s.indexPosts = nil + s.atomicIndex.Store(s.indexPosts) + } + case action := <-s.indexActionCh: + switch action { + case core.IdxActCreatePost, core.IdxActUpdatePost, core.IdxActDeletePost, core.IdxActStickPost: + // prevent many update post in least time + if len(s.indexPosts) != 0 { + logrus.Debugf("remove index posts by action %s", action) + s.indexPosts = nil + s.atomicIndex.Store(s.indexPosts) + } + default: + // nop + } + } + } +} diff --git a/internal/dao/dao.go b/internal/dao/dao.go index 96d6972c..df97701b 100644 --- a/internal/dao/dao.go +++ b/internal/dao/dao.go @@ -21,14 +21,20 @@ var ( _ core.ObjectStorageService = (*s3Servant)(nil) _ core.ObjectStorageService = (*localossServant)(nil) _ core.AttachmentCheckService = (*attachmentCheckServant)(nil) + _ core.CacheIndexService = (*simpleCacheIndexServant)(nil) ) type dataServant struct { + useCacheIndex bool + cacheIndex core.CacheIndexService + engine *gorm.DB zinc *zinc.ZincClient +} - useCacheIndex bool - indexActionCh chan indexActionT +type simpleCacheIndexServant struct { + getIndexPosts func(offset, limit int) ([]*model.PostFormated, error) + indexActionCh chan core.IndexActionT indexPosts []*model.PostFormated atomicIndex atomic.Value maxIndexSize int @@ -59,29 +65,18 @@ type attachmentCheckServant struct { } func NewDataService(engine *gorm.DB, zinc *zinc.ZincClient) core.DataService { - if !conf.CfgIf("CacheIndex") { - return &dataServant{ - engine: engine, - zinc: zinc, - useCacheIndex: false, - } - } - - s := conf.CacheIndexSetting ds := &dataServant{ - engine: engine, - zinc: zinc, - useCacheIndex: true, - maxIndexSize: conf.CacheIndexSetting.MaxIndexSize, - indexPosts: make([]*model.PostFormated, 0), - indexActionCh: make(chan indexActionT, 100), // optimize: size need configure by custom - checkTick: time.NewTicker(time.Duration(s.CheckTickDuration) * time.Second), // check whether need update index every 1 minute - expireIndexTick: time.NewTicker(time.Duration(s.ExpireTickDuration) * time.Second), // force expire index every 5 minute + engine: engine, + zinc: zinc, } - // start index posts - ds.atomicIndex.Store(ds.indexPosts) - go ds.startIndexPosts() + // initialize CacheIndex if needed + if !conf.CfgIf("CacheIndex") { + ds.useCacheIndex = false + } else { + ds.useCacheIndex = true + ds.cacheIndex = newSimpleCacheIndexServant(ds.getIndexPosts) + } return ds } diff --git a/internal/dao/post.go b/internal/dao/post.go index 8e9ba8ed..292e16f9 100644 --- a/internal/dao/post.go +++ b/internal/dao/post.go @@ -3,6 +3,7 @@ package dao import ( "time" + "github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/model" ) @@ -12,7 +13,7 @@ func (d *dataServant) CreatePost(post *model.Post) (*model.Post, error) { if err != nil { return nil, err } - d.indexActive(idxActCreatePost) + d.indexActive(core.IdxActCreatePost) return p, nil } @@ -20,7 +21,7 @@ func (d *dataServant) DeletePost(post *model.Post) error { if err := post.Delete(d.engine); err != nil { return err } - d.indexActive(idxActDeletePost) + d.indexActive(core.IdxActDeletePost) return nil } @@ -34,7 +35,7 @@ func (d *dataServant) StickPost(post *model.Post) error { if err := post.Update(d.engine); err != nil { return err } - d.indexActive(idxActStickPost) + d.indexActive(core.IdxActStickPost) return nil } @@ -59,7 +60,7 @@ func (d *dataServant) UpdatePost(post *model.Post) error { if err := post.Update(d.engine); err != nil { return err } - d.indexActive(idxActUpdatePost) + d.indexActive(core.IdxActUpdatePost) return nil } diff --git a/internal/dao/post_action.go b/internal/dao/post_action.go deleted file mode 100644 index 408ef6ce..00000000 --- a/internal/dao/post_action.go +++ /dev/null @@ -1,28 +0,0 @@ -package dao - -const ( - idxActNop indexActionT = iota + 1 - idxActCreatePost - idxActUpdatePost - idxActDeletePost - idxActStickPost -) - -type indexActionT uint8 - -func (a indexActionT) String() string { - switch a { - case idxActNop: - return "no operator" - case idxActCreatePost: - return "create post" - case idxActUpdatePost: - return "update post" - case idxActDeletePost: - return "delete post" - case idxActStickPost: - return "stick post" - default: - return "unknow action" - } -} diff --git a/internal/dao/post_index.go b/internal/dao/post_index.go index 56a8c63f..53a0913d 100644 --- a/internal/dao/post_index.go +++ b/internal/dao/post_index.go @@ -1,18 +1,16 @@ package dao import ( + "github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/model" "github.com/sirupsen/logrus" ) func (d *dataServant) IndexPosts(offset int, limit int) ([]*model.PostFormated, error) { if d.useCacheIndex { - posts := d.atomicIndex.Load().([]*model.PostFormated) - start := offset * limit - end := start + limit - if len(posts) >= end { + if posts, ok := d.cacheIndex.IndexPosts(offset, limit); ok { logrus.Debugln("get index posts from cached") - return posts[start:end], nil + return posts, nil } } logrus.Debugf("get index posts from database but useCacheIndex: %t", d.useCacheIndex) @@ -69,49 +67,8 @@ func (d *dataServant) getIndexPosts(offset int, limit int) ([]*model.PostFormate return d.MergePosts(posts) } -func (d *dataServant) indexActive(act indexActionT) { - select { - case d.indexActionCh <- act: - logrus.Debugf("send indexAction by chan: %s", act) - default: - go func(ch chan<- indexActionT, act indexActionT) { - logrus.Debugf("send indexAction by goroutine: %s", act) - ch <- act - }(d.indexActionCh, act) - } -} - -func (d *dataServant) startIndexPosts() { - var err error - for { - select { - case <-d.checkTick.C: - if len(d.indexPosts) == 0 { - logrus.Debugf("index posts by checkTick") - if d.indexPosts, err = d.getIndexPosts(0, d.maxIndexSize); err == nil { - d.atomicIndex.Store(d.indexPosts) - } else { - logrus.Errorf("get index posts err: %v", err) - } - } - case <-d.expireIndexTick.C: - logrus.Debugf("expire index posts by expireIndexTick") - if len(d.indexPosts) != 0 { - d.indexPosts = nil - d.atomicIndex.Store(d.indexPosts) - } - case action := <-d.indexActionCh: - switch action { - case idxActCreatePost, idxActUpdatePost, idxActDeletePost, idxActStickPost: - // prevent many update post in least time - if len(d.indexPosts) != 0 { - logrus.Debugf("remove index posts by action %s", action) - d.indexPosts = nil - d.atomicIndex.Store(d.indexPosts) - } - default: - // nop - } - } +func (d *dataServant) indexActive(act core.IndexActionT) { + if d.useCacheIndex { + d.cacheIndex.SendAction(act) } } diff --git a/internal/routers/api/post.go b/internal/routers/api/post.go index 942a130d..8b4f3703 100644 --- a/internal/routers/api/post.go +++ b/internal/routers/api/post.go @@ -23,14 +23,6 @@ func GetPostList(c *gin.Context) { } if q.Query == "" && q.Type == "search" { - // 直接读库 - // posts, err := service.GetPostList(&service.PostListReq{ - // Conditions: &model.ConditionsT{ - // "ORDER": "is_top DESC, latest_replied_on DESC", - // }, - // Offset: (app.GetPage(c) - 1) * app.GetPageSize(c), - // Limit: app.GetPageSize(c), - // }) offset, limit := app.GetPageOffset(c) posts, err := service.GetIndexPosts(offset, limit) if err != nil {