Merge pull request #153 from alimy/pr-optimize-cache

optimize BigCacheIndex logic
pull/154/head
Michael Li 2 years ago committed by GitHub
commit 498ce3f216
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,7 +1,9 @@
package core
import "github.com/rocboss/paopao-ce/internal/model"
const (
IdxActNop IndexActionT = iota + 1
IdxActNop IdxAct = iota + 1
IdxActCreatePost
IdxActUpdatePost
IdxActDeletePost
@ -9,9 +11,14 @@ const (
IdxActVisiblePost
)
type IndexActionT uint8
type IdxAct uint8
type IndexAction struct {
Act IdxAct
Post *model.Post
}
func (a IndexActionT) String() string {
func (a IdxAct) String() string {
switch a {
case IdxActNop:
return "no operator"
@ -30,9 +37,16 @@ func (a IndexActionT) String() string {
}
}
func NewIndexAction(act IdxAct, post *model.Post) *IndexAction {
return &IndexAction{
Act: act,
Post: post,
}
}
// CacheIndexService cache index service interface
type CacheIndexService interface {
IndexPostsService
SendAction(active IndexActionT)
SendAction(act IdxAct, post *model.Post)
}

@ -4,6 +4,8 @@ import (
"bytes"
"encoding/gob"
"fmt"
"strconv"
"strings"
"time"
"github.com/Masterminds/semver/v3"
@ -27,10 +29,11 @@ type postsEntry struct {
type bigCacheIndexServant struct {
ips core.IndexPostsService
indexActionCh chan core.IndexActionT
indexActionCh chan *core.IndexAction
cachePostsCh chan *postsEntry
cache *bigcache.BigCache
lastCacheResetTime time.Time
preventDuration time.Duration
}
func (s *bigCacheIndexServant) IndexPosts(user *model.User, offset int, limit int) (*rest.IndexTweetsResp, error) {
@ -99,15 +102,16 @@ func (s *bigCacheIndexServant) keyFrom(user *model.User, offset int, limit int)
return fmt.Sprintf("index:%d:%d:%d", userId, offset, limit)
}
func (s *bigCacheIndexServant) SendAction(act core.IndexActionT) {
func (s *bigCacheIndexServant) SendAction(act core.IdxAct, post *model.Post) {
action := core.NewIndexAction(act, post)
select {
case s.indexActionCh <- act:
case s.indexActionCh <- action:
logrus.Debugf("bigCacheIndexServant.SendAction send indexAction by chan: %s", act)
default:
go func(ch chan<- core.IndexActionT, act core.IndexActionT) {
logrus.Debugf("bigCacheIndexServant.SendAction send indexAction by goroutine: %s", act)
go func(ch chan<- *core.IndexAction, act *core.IndexAction) {
logrus.Debugf("bigCacheIndexServant.SendAction send indexAction by goroutine: %s", action.Act)
ch <- act
}(s.indexActionCh, act)
}(s.indexActionCh, action)
}
}
@ -117,30 +121,64 @@ func (s *bigCacheIndexServant) startIndexPosts() {
case entry := <-s.cachePostsCh:
s.setPosts(entry)
case action := <-s.indexActionCh:
switch action {
// TODO: 这里列出来是因为后续可能会精细化处理每种情况
case core.IdxActCreatePost,
core.IdxActUpdatePost,
core.IdxActDeletePost,
core.IdxActStickPost,
core.IdxActVisiblePost:
// TODO: 粗糙处理cache后续需要针对每一种情况精细化处理
if time.Since(s.lastCacheResetTime) > time.Minute {
s.cache.Reset()
s.lastCacheResetTime = time.Now()
logrus.Debugf("reset cache by %s", action)
}
default:
// nop
}
s.handleIndexAction(action)
}
}
}
func (s *bigCacheIndexServant) handleIndexAction(action *core.IndexAction) {
act, post := action.Act, action.Post
// 创建/删除 私密推文特殊处理
switch act {
case core.IdxActCreatePost, core.IdxActDeletePost:
if post.Visibility == model.PostVisitPrivate {
s.deleteCacheByUserId(post.UserID)
return
}
}
// 如果在s.preventDuration时间内就清除所有缓存否则只清除自个儿的缓存
// TODO: 需要优化只清除受影响的缓存,后续完善
if time.Since(s.lastCacheResetTime) > s.preventDuration {
s.cache.Reset()
s.lastCacheResetTime = time.Now()
logrus.Debugf("bigCacheIndexServant.handleIndexAction reset cache by %s", action.Act)
} else {
s.deleteCacheByUserId(post.UserID)
}
}
func (s *bigCacheIndexServant) deleteCacheByUserId(id int64) {
var keys []string
userId := strconv.FormatInt(id, 10)
// 获取需要删除缓存的key目前是仅删除自个儿的缓存
for it := s.cache.Iterator(); it.SetNext(); {
entry, err := it.Value()
if err != nil {
logrus.Debugf("bigCacheIndexServant.deleteCacheByUserId usrId: %s err:%s", userId, err)
return
}
key := entry.Key()
keyParts := strings.Split(key, ":")
if len(keyParts) > 2 && keyParts[0] == "index" && keyParts[1] == userId {
keys = append(keys, key)
}
}
// 执行删缓存
for _, k := range keys {
s.cache.Delete(k)
}
s.lastCacheResetTime = time.Now()
logrus.Debugf("bigCacheIndexServant.deleteCacheByUserId userId:%d", id)
}
func (s *bigCacheIndexServant) Name() string {
return "BigCacheIndex"
}
func (s *bigCacheIndexServant) Version() *semver.Version {
return semver.MustParse("v0.1.0")
return semver.MustParse("v0.2.0")
}

@ -23,8 +23,9 @@ func NewBigCacheIndexService(indexPosts core.IndexPostsService) (core.CacheIndex
}
cacheIndex := &bigCacheIndexServant{
ips: indexPosts,
cache: cache,
ips: indexPosts,
cache: cache,
preventDuration: 10 * time.Second,
}
// indexActionCh capacity custom configure by conf.yaml need in [10, 10000]
@ -35,7 +36,7 @@ func NewBigCacheIndexService(indexPosts core.IndexPostsService) (core.CacheIndex
} else if capacity > 10000 {
capacity = 10000
}
cacheIndex.indexActionCh = make(chan core.IndexActionT, capacity)
cacheIndex.indexActionCh = make(chan *core.IndexAction, capacity)
cacheIndex.cachePostsCh = make(chan *postsEntry, capacity)
// 启动索引更新器
@ -69,7 +70,7 @@ func NewSimpleCacheIndexService(indexPosts core.IndexPostsService) (core.CacheIn
} else if capacity > 10000 {
capacity = 10000
}
cacheIndex.indexActionCh = make(chan core.IndexActionT, capacity)
cacheIndex.indexActionCh = make(chan core.IdxAct, capacity)
// start index posts
cacheIndex.atomicIndex.Store(cacheIndex.indexPosts)

@ -20,7 +20,7 @@ func (s *noneCacheIndexServant) IndexPosts(user *model.User, offset int, limit i
return s.ips.IndexPosts(user, offset, limit)
}
func (s *noneCacheIndexServant) SendAction(act core.IndexActionT) {
func (s *noneCacheIndexServant) SendAction(_act core.IdxAct, _post *model.Post) {
// empty
}

@ -19,7 +19,7 @@ var (
type simpleCacheIndexServant struct {
ips core.IndexPostsService
indexActionCh chan core.IndexActionT
indexActionCh chan core.IdxAct
indexPosts *rest.IndexTweetsResp
atomicIndex atomic.Value
maxIndexSize int
@ -45,12 +45,12 @@ func (s *simpleCacheIndexServant) IndexPosts(user *model.User, offset int, limit
return s.ips.IndexPosts(user, offset, limit)
}
func (s *simpleCacheIndexServant) SendAction(act core.IndexActionT) {
func (s *simpleCacheIndexServant) SendAction(act core.IdxAct, _post *model.Post) {
select {
case s.indexActionCh <- act:
logrus.Debugf("simpleCacheIndexServant.SendAction send indexAction by chan: %s", act)
default:
go func(ch chan<- core.IndexActionT, act core.IndexActionT) {
go func(ch chan<- core.IdxAct, act core.IdxAct) {
logrus.Debugf("simpleCacheIndexServant.SendAction send indexAction by goroutine: %s", act)
ch <- act
}(s.indexActionCh, act)
@ -102,5 +102,5 @@ func (s *simpleCacheIndexServant) Name() string {
}
func (s *simpleCacheIndexServant) Version() *semver.Version {
return semver.MustParse("v0.1.0")
return semver.MustParse("v0.2.0")
}

@ -166,7 +166,7 @@ func (s *tweetManageServant) CreatePost(post *model.Post) (*model.Post, error) {
if err != nil {
return nil, err
}
s.cacheIndex.SendAction(core.IdxActCreatePost)
s.cacheIndex.SendAction(core.IdxActCreatePost, post)
return p, nil
}
@ -174,7 +174,7 @@ func (s *tweetManageServant) DeletePost(post *model.Post) error {
if err := post.Delete(s.db); err != nil {
return err
}
s.cacheIndex.SendAction(core.IdxActDeletePost)
s.cacheIndex.SendAction(core.IdxActDeletePost, post)
return nil
}
@ -188,7 +188,7 @@ func (s *tweetManageServant) StickPost(post *model.Post) error {
if err := post.Update(s.db); err != nil {
return err
}
s.cacheIndex.SendAction(core.IdxActStickPost)
s.cacheIndex.SendAction(core.IdxActStickPost, post)
return nil
}
@ -228,7 +228,7 @@ func (s *tweetManageServant) VisiblePost(post *model.Post, visibility model.Post
}
}
db.Commit()
s.cacheIndex.SendAction(core.IdxActVisiblePost)
s.cacheIndex.SendAction(core.IdxActVisiblePost, post)
return nil
}
@ -236,7 +236,7 @@ func (s *tweetManageServant) UpdatePost(post *model.Post) error {
if err := post.Update(s.db); err != nil {
return err
}
s.cacheIndex.SendAction(core.IdxActUpdatePost)
s.cacheIndex.SendAction(core.IdxActUpdatePost, post)
return nil
}

Loading…
Cancel
Save