optimize #78 add abstract cache index servie

pull/78/head
alimy 3 years ago
parent c215566ede
commit 3c3071791e

@ -1,6 +1,38 @@
package core
// CacheService cache service interface that implement base Redis or other
type CacheService interface {
// TODO
import (
"github.com/rocboss/paopao-ce/internal/model"
)
const (
IdxActNop IndexActionT = iota + 1
IdxActCreatePost
IdxActUpdatePost
IdxActDeletePost
IdxActStickPost
)
type IndexActionT uint8
func (a IndexActionT) String() string {
switch a {
case IdxActNop:
return "no operator"
case IdxActCreatePost:
return "create post"
case IdxActUpdatePost:
return "update post"
case IdxActDeletePost:
return "delete post"
case IdxActStickPost:
return "stick post"
default:
return "unknow action"
}
}
// CacheIndexService cache index service interface
type CacheIndexService interface {
IndexPosts(offset int, limit int) ([]*model.PostFormated, bool)
SendAction(active IndexActionT)
}

@ -0,0 +1,86 @@
package dao
import (
"time"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/sirupsen/logrus"
)
func newSimpleCacheIndexServant(getIndexPosts func(offset, limit int) ([]*model.PostFormated, error)) *simpleCacheIndexServant {
s := conf.CacheIndexSetting
cacheIndex := &simpleCacheIndexServant{
getIndexPosts: getIndexPosts,
maxIndexSize: s.MaxIndexSize,
indexPosts: make([]*model.PostFormated, 0),
indexActionCh: make(chan core.IndexActionT, 100), // optimize: size need configure by custom
checkTick: time.NewTicker(time.Duration(s.CheckTickDuration) * time.Second), // check whether need update index every 1 minute
expireIndexTick: time.NewTicker(time.Duration(s.ExpireTickDuration) * time.Second), // force expire index every 5 minute
}
// start index posts
cacheIndex.atomicIndex.Store(cacheIndex.indexPosts)
go cacheIndex.startIndexPosts()
return cacheIndex
}
func (s *simpleCacheIndexServant) IndexPosts(offset int, limit int) ([]*model.PostFormated, bool) {
posts := s.atomicIndex.Load().([]*model.PostFormated)
start := offset * limit
end := start + limit
if len(posts) >= end {
logrus.Debugln("get index posts from cached")
return posts[start:end], true
}
return nil, false
}
func (s *simpleCacheIndexServant) SendAction(act core.IndexActionT) {
select {
case s.indexActionCh <- act:
logrus.Debugf("send indexAction by chan: %s", act)
default:
go func(ch chan<- core.IndexActionT, act core.IndexActionT) {
logrus.Debugf("send indexAction by goroutine: %s", act)
ch <- act
}(s.indexActionCh, act)
}
}
func (s *simpleCacheIndexServant) startIndexPosts() {
var err error
for {
select {
case <-s.checkTick.C:
if len(s.indexPosts) == 0 {
logrus.Debugf("index posts by checkTick")
if s.indexPosts, err = s.getIndexPosts(0, s.maxIndexSize); err == nil {
s.atomicIndex.Store(s.indexPosts)
} else {
logrus.Errorf("get index posts err: %v", err)
}
}
case <-s.expireIndexTick.C:
logrus.Debugf("expire index posts by expireIndexTick")
if len(s.indexPosts) != 0 {
s.indexPosts = nil
s.atomicIndex.Store(s.indexPosts)
}
case action := <-s.indexActionCh:
switch action {
case core.IdxActCreatePost, core.IdxActUpdatePost, core.IdxActDeletePost, core.IdxActStickPost:
// prevent many update post in least time
if len(s.indexPosts) != 0 {
logrus.Debugf("remove index posts by action %s", action)
s.indexPosts = nil
s.atomicIndex.Store(s.indexPosts)
}
default:
// nop
}
}
}
}

@ -21,14 +21,20 @@ var (
_ core.ObjectStorageService = (*s3Servant)(nil)
_ core.ObjectStorageService = (*localossServant)(nil)
_ core.AttachmentCheckService = (*attachmentCheckServant)(nil)
_ core.CacheIndexService = (*simpleCacheIndexServant)(nil)
)
type dataServant struct {
useCacheIndex bool
cacheIndex core.CacheIndexService
engine *gorm.DB
zinc *zinc.ZincClient
}
useCacheIndex bool
indexActionCh chan indexActionT
type simpleCacheIndexServant struct {
getIndexPosts func(offset, limit int) ([]*model.PostFormated, error)
indexActionCh chan core.IndexActionT
indexPosts []*model.PostFormated
atomicIndex atomic.Value
maxIndexSize int
@ -59,29 +65,18 @@ type attachmentCheckServant struct {
}
func NewDataService(engine *gorm.DB, zinc *zinc.ZincClient) core.DataService {
if !conf.CfgIf("CacheIndex") {
return &dataServant{
engine: engine,
zinc: zinc,
useCacheIndex: false,
}
}
s := conf.CacheIndexSetting
ds := &dataServant{
engine: engine,
zinc: zinc,
useCacheIndex: true,
maxIndexSize: conf.CacheIndexSetting.MaxIndexSize,
indexPosts: make([]*model.PostFormated, 0),
indexActionCh: make(chan indexActionT, 100), // optimize: size need configure by custom
checkTick: time.NewTicker(time.Duration(s.CheckTickDuration) * time.Second), // check whether need update index every 1 minute
expireIndexTick: time.NewTicker(time.Duration(s.ExpireTickDuration) * time.Second), // force expire index every 5 minute
}
// start index posts
ds.atomicIndex.Store(ds.indexPosts)
go ds.startIndexPosts()
// initialize CacheIndex if needed
if !conf.CfgIf("CacheIndex") {
ds.useCacheIndex = false
} else {
ds.useCacheIndex = true
ds.cacheIndex = newSimpleCacheIndexServant(ds.getIndexPosts)
}
return ds
}

@ -3,6 +3,7 @@ package dao
import (
"time"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
)
@ -12,7 +13,7 @@ func (d *dataServant) CreatePost(post *model.Post) (*model.Post, error) {
if err != nil {
return nil, err
}
d.indexActive(idxActCreatePost)
d.indexActive(core.IdxActCreatePost)
return p, nil
}
@ -20,7 +21,7 @@ func (d *dataServant) DeletePost(post *model.Post) error {
if err := post.Delete(d.engine); err != nil {
return err
}
d.indexActive(idxActDeletePost)
d.indexActive(core.IdxActDeletePost)
return nil
}
@ -34,7 +35,7 @@ func (d *dataServant) StickPost(post *model.Post) error {
if err := post.Update(d.engine); err != nil {
return err
}
d.indexActive(idxActStickPost)
d.indexActive(core.IdxActStickPost)
return nil
}
@ -59,7 +60,7 @@ func (d *dataServant) UpdatePost(post *model.Post) error {
if err := post.Update(d.engine); err != nil {
return err
}
d.indexActive(idxActUpdatePost)
d.indexActive(core.IdxActUpdatePost)
return nil
}

@ -1,28 +0,0 @@
package dao
const (
idxActNop indexActionT = iota + 1
idxActCreatePost
idxActUpdatePost
idxActDeletePost
idxActStickPost
)
type indexActionT uint8
func (a indexActionT) String() string {
switch a {
case idxActNop:
return "no operator"
case idxActCreatePost:
return "create post"
case idxActUpdatePost:
return "update post"
case idxActDeletePost:
return "delete post"
case idxActStickPost:
return "stick post"
default:
return "unknow action"
}
}

@ -1,18 +1,16 @@
package dao
import (
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/sirupsen/logrus"
)
func (d *dataServant) IndexPosts(offset int, limit int) ([]*model.PostFormated, error) {
if d.useCacheIndex {
posts := d.atomicIndex.Load().([]*model.PostFormated)
start := offset * limit
end := start + limit
if len(posts) >= end {
if posts, ok := d.cacheIndex.IndexPosts(offset, limit); ok {
logrus.Debugln("get index posts from cached")
return posts[start:end], nil
return posts, nil
}
}
logrus.Debugf("get index posts from database but useCacheIndex: %t", d.useCacheIndex)
@ -69,49 +67,8 @@ func (d *dataServant) getIndexPosts(offset int, limit int) ([]*model.PostFormate
return d.MergePosts(posts)
}
func (d *dataServant) indexActive(act indexActionT) {
select {
case d.indexActionCh <- act:
logrus.Debugf("send indexAction by chan: %s", act)
default:
go func(ch chan<- indexActionT, act indexActionT) {
logrus.Debugf("send indexAction by goroutine: %s", act)
ch <- act
}(d.indexActionCh, act)
}
}
func (d *dataServant) startIndexPosts() {
var err error
for {
select {
case <-d.checkTick.C:
if len(d.indexPosts) == 0 {
logrus.Debugf("index posts by checkTick")
if d.indexPosts, err = d.getIndexPosts(0, d.maxIndexSize); err == nil {
d.atomicIndex.Store(d.indexPosts)
} else {
logrus.Errorf("get index posts err: %v", err)
}
}
case <-d.expireIndexTick.C:
logrus.Debugf("expire index posts by expireIndexTick")
if len(d.indexPosts) != 0 {
d.indexPosts = nil
d.atomicIndex.Store(d.indexPosts)
}
case action := <-d.indexActionCh:
switch action {
case idxActCreatePost, idxActUpdatePost, idxActDeletePost, idxActStickPost:
// prevent many update post in least time
if len(d.indexPosts) != 0 {
logrus.Debugf("remove index posts by action %s", action)
d.indexPosts = nil
d.atomicIndex.Store(d.indexPosts)
}
default:
// nop
}
}
func (d *dataServant) indexActive(act core.IndexActionT) {
if d.useCacheIndex {
d.cacheIndex.SendAction(act)
}
}

@ -23,14 +23,6 @@ func GetPostList(c *gin.Context) {
}
if q.Query == "" && q.Type == "search" {
// 直接读库
// posts, err := service.GetPostList(&service.PostListReq{
// Conditions: &model.ConditionsT{
// "ORDER": "is_top DESC, latest_replied_on DESC",
// },
// Offset: (app.GetPage(c) - 1) * app.GetPageSize(c),
// Limit: app.GetPageSize(c),
// })
offset, limit := app.GetPageOffset(c)
posts, err := service.GetIndexPosts(offset, limit)
if err != nil {

Loading…
Cancel
Save