From 70f68a1c4e6d4df759e57c614458119dce6f69dd Mon Sep 17 00:00:00 2001 From: alimy Date: Sat, 16 Jul 2022 15:14:03 +0800 Subject: [PATCH] optimize BigCacheIndex logic --- internal/core/cache.go | 22 +++++++-- internal/dao/cache/bigcache.go | 84 ++++++++++++++++++++++++---------- internal/dao/cache/cache.go | 9 ++-- internal/dao/cache/none.go | 2 +- internal/dao/cache/simple.go | 8 ++-- internal/dao/jinzhu/tweets.go | 10 ++-- 6 files changed, 94 insertions(+), 41 deletions(-) diff --git a/internal/core/cache.go b/internal/core/cache.go index ecbc8e35..e4cf4cd9 100644 --- a/internal/core/cache.go +++ b/internal/core/cache.go @@ -1,7 +1,9 @@ package core +import "github.com/rocboss/paopao-ce/internal/model" + const ( - IdxActNop IndexActionT = iota + 1 + IdxActNop IdxAct = iota + 1 IdxActCreatePost IdxActUpdatePost IdxActDeletePost @@ -9,9 +11,14 @@ const ( IdxActVisiblePost ) -type IndexActionT uint8 +type IdxAct uint8 + +type IndexAction struct { + Act IdxAct + Post *model.Post +} -func (a IndexActionT) String() string { +func (a IdxAct) String() string { switch a { case IdxActNop: return "no operator" @@ -30,9 +37,16 @@ func (a IndexActionT) String() string { } } +func NewIndexAction(act IdxAct, post *model.Post) *IndexAction { + return &IndexAction{ + Act: act, + Post: post, + } +} + // CacheIndexService cache index service interface type CacheIndexService interface { IndexPostsService - SendAction(active IndexActionT) + SendAction(act IdxAct, post *model.Post) } diff --git a/internal/dao/cache/bigcache.go b/internal/dao/cache/bigcache.go index 5c5170b5..09e5c03c 100644 --- a/internal/dao/cache/bigcache.go +++ b/internal/dao/cache/bigcache.go @@ -4,6 +4,8 @@ import ( "bytes" "encoding/gob" "fmt" + "strconv" + "strings" "time" "github.com/Masterminds/semver/v3" @@ -27,10 +29,11 @@ type postsEntry struct { type bigCacheIndexServant struct { ips core.IndexPostsService - indexActionCh chan core.IndexActionT + indexActionCh chan *core.IndexAction cachePostsCh chan *postsEntry cache *bigcache.BigCache lastCacheResetTime time.Time + preventDuration time.Duration } func (s *bigCacheIndexServant) IndexPosts(user *model.User, offset int, limit int) (*rest.IndexTweetsResp, error) { @@ -99,15 +102,16 @@ func (s *bigCacheIndexServant) keyFrom(user *model.User, offset int, limit int) return fmt.Sprintf("index:%d:%d:%d", userId, offset, limit) } -func (s *bigCacheIndexServant) SendAction(act core.IndexActionT) { +func (s *bigCacheIndexServant) SendAction(act core.IdxAct, post *model.Post) { + action := core.NewIndexAction(act, post) select { - case s.indexActionCh <- act: + case s.indexActionCh <- action: logrus.Debugf("bigCacheIndexServant.SendAction send indexAction by chan: %s", act) default: - go func(ch chan<- core.IndexActionT, act core.IndexActionT) { - logrus.Debugf("bigCacheIndexServant.SendAction send indexAction by goroutine: %s", act) + go func(ch chan<- *core.IndexAction, act *core.IndexAction) { + logrus.Debugf("bigCacheIndexServant.SendAction send indexAction by goroutine: %s", action.Act) ch <- act - }(s.indexActionCh, act) + }(s.indexActionCh, action) } } @@ -117,30 +121,64 @@ func (s *bigCacheIndexServant) startIndexPosts() { case entry := <-s.cachePostsCh: s.setPosts(entry) case action := <-s.indexActionCh: - switch action { - // TODO: 这里列出来是因为后续可能会精细化处理每种情况 - case core.IdxActCreatePost, - core.IdxActUpdatePost, - core.IdxActDeletePost, - core.IdxActStickPost, - core.IdxActVisiblePost: - // TODO: 粗糙处理cache,后续需要针对每一种情况精细化处理 - if time.Since(s.lastCacheResetTime) > time.Minute { - s.cache.Reset() - s.lastCacheResetTime = time.Now() - logrus.Debugf("reset cache by %s", action) - } - default: - // nop - } + s.handleIndexAction(action) } } } +func (s *bigCacheIndexServant) handleIndexAction(action *core.IndexAction) { + act, post := action.Act, action.Post + + // 创建/删除 私密推文特殊处理 + switch act { + case core.IdxActCreatePost, core.IdxActDeletePost: + if post.Visibility == model.PostVisitPrivate { + s.deleteCacheByUserId(post.UserID) + return + } + } + + // 如果在s.preventDuration时间内就清除所有缓存,否则只清除自个儿的缓存 + // TODO: 需要优化只清除受影响的缓存,后续完善 + if time.Since(s.lastCacheResetTime) > s.preventDuration { + s.cache.Reset() + s.lastCacheResetTime = time.Now() + logrus.Debugf("bigCacheIndexServant.handleIndexAction reset cache by %s", action.Act) + } else { + s.deleteCacheByUserId(post.UserID) + } +} + +func (s *bigCacheIndexServant) deleteCacheByUserId(id int64) { + var keys []string + userId := strconv.FormatInt(id, 10) + + // 获取需要删除缓存的key,目前是仅删除自个儿的缓存 + for it := s.cache.Iterator(); it.SetNext(); { + entry, err := it.Value() + if err != nil { + logrus.Debugf("bigCacheIndexServant.deleteCacheByUserId usrId: %s err:%s", userId, err) + return + } + key := entry.Key() + keyParts := strings.Split(key, ":") + if len(keyParts) > 2 && keyParts[0] == "index" && keyParts[1] == userId { + keys = append(keys, key) + } + } + + // 执行删缓存 + for _, k := range keys { + s.cache.Delete(k) + } + s.lastCacheResetTime = time.Now() + logrus.Debugf("bigCacheIndexServant.deleteCacheByUserId userId:%d", id) +} + func (s *bigCacheIndexServant) Name() string { return "BigCacheIndex" } func (s *bigCacheIndexServant) Version() *semver.Version { - return semver.MustParse("v0.1.0") + return semver.MustParse("v0.2.0") } diff --git a/internal/dao/cache/cache.go b/internal/dao/cache/cache.go index 858c545f..fef86f8a 100644 --- a/internal/dao/cache/cache.go +++ b/internal/dao/cache/cache.go @@ -23,8 +23,9 @@ func NewBigCacheIndexService(indexPosts core.IndexPostsService) (core.CacheIndex } cacheIndex := &bigCacheIndexServant{ - ips: indexPosts, - cache: cache, + ips: indexPosts, + cache: cache, + preventDuration: 10 * time.Second, } // indexActionCh capacity custom configure by conf.yaml need in [10, 10000] @@ -35,7 +36,7 @@ func NewBigCacheIndexService(indexPosts core.IndexPostsService) (core.CacheIndex } else if capacity > 10000 { capacity = 10000 } - cacheIndex.indexActionCh = make(chan core.IndexActionT, capacity) + cacheIndex.indexActionCh = make(chan *core.IndexAction, capacity) cacheIndex.cachePostsCh = make(chan *postsEntry, capacity) // 启动索引更新器 @@ -69,7 +70,7 @@ func NewSimpleCacheIndexService(indexPosts core.IndexPostsService) (core.CacheIn } else if capacity > 10000 { capacity = 10000 } - cacheIndex.indexActionCh = make(chan core.IndexActionT, capacity) + cacheIndex.indexActionCh = make(chan core.IdxAct, capacity) // start index posts cacheIndex.atomicIndex.Store(cacheIndex.indexPosts) diff --git a/internal/dao/cache/none.go b/internal/dao/cache/none.go index 10935075..79705280 100644 --- a/internal/dao/cache/none.go +++ b/internal/dao/cache/none.go @@ -20,7 +20,7 @@ func (s *noneCacheIndexServant) IndexPosts(user *model.User, offset int, limit i return s.ips.IndexPosts(user, offset, limit) } -func (s *noneCacheIndexServant) SendAction(act core.IndexActionT) { +func (s *noneCacheIndexServant) SendAction(_act core.IdxAct, _post *model.Post) { // empty } diff --git a/internal/dao/cache/simple.go b/internal/dao/cache/simple.go index 9313dffc..afdea1be 100644 --- a/internal/dao/cache/simple.go +++ b/internal/dao/cache/simple.go @@ -19,7 +19,7 @@ var ( type simpleCacheIndexServant struct { ips core.IndexPostsService - indexActionCh chan core.IndexActionT + indexActionCh chan core.IdxAct indexPosts *rest.IndexTweetsResp atomicIndex atomic.Value maxIndexSize int @@ -45,12 +45,12 @@ func (s *simpleCacheIndexServant) IndexPosts(user *model.User, offset int, limit return s.ips.IndexPosts(user, offset, limit) } -func (s *simpleCacheIndexServant) SendAction(act core.IndexActionT) { +func (s *simpleCacheIndexServant) SendAction(act core.IdxAct, _post *model.Post) { select { case s.indexActionCh <- act: logrus.Debugf("simpleCacheIndexServant.SendAction send indexAction by chan: %s", act) default: - go func(ch chan<- core.IndexActionT, act core.IndexActionT) { + go func(ch chan<- core.IdxAct, act core.IdxAct) { logrus.Debugf("simpleCacheIndexServant.SendAction send indexAction by goroutine: %s", act) ch <- act }(s.indexActionCh, act) @@ -102,5 +102,5 @@ func (s *simpleCacheIndexServant) Name() string { } func (s *simpleCacheIndexServant) Version() *semver.Version { - return semver.MustParse("v0.1.0") + return semver.MustParse("v0.2.0") } diff --git a/internal/dao/jinzhu/tweets.go b/internal/dao/jinzhu/tweets.go index cbecff41..da8be500 100644 --- a/internal/dao/jinzhu/tweets.go +++ b/internal/dao/jinzhu/tweets.go @@ -166,7 +166,7 @@ func (s *tweetManageServant) CreatePost(post *model.Post) (*model.Post, error) { if err != nil { return nil, err } - s.cacheIndex.SendAction(core.IdxActCreatePost) + s.cacheIndex.SendAction(core.IdxActCreatePost, post) return p, nil } @@ -174,7 +174,7 @@ func (s *tweetManageServant) DeletePost(post *model.Post) error { if err := post.Delete(s.db); err != nil { return err } - s.cacheIndex.SendAction(core.IdxActDeletePost) + s.cacheIndex.SendAction(core.IdxActDeletePost, post) return nil } @@ -188,7 +188,7 @@ func (s *tweetManageServant) StickPost(post *model.Post) error { if err := post.Update(s.db); err != nil { return err } - s.cacheIndex.SendAction(core.IdxActStickPost) + s.cacheIndex.SendAction(core.IdxActStickPost, post) return nil } @@ -228,7 +228,7 @@ func (s *tweetManageServant) VisiblePost(post *model.Post, visibility model.Post } } db.Commit() - s.cacheIndex.SendAction(core.IdxActVisiblePost) + s.cacheIndex.SendAction(core.IdxActVisiblePost, post) return nil } @@ -236,7 +236,7 @@ func (s *tweetManageServant) UpdatePost(post *model.Post) error { if err := post.Update(s.db); err != nil { return err } - s.cacheIndex.SendAction(core.IdxActUpdatePost) + s.cacheIndex.SendAction(core.IdxActUpdatePost, post) return nil }