|
|
|
@ -4,6 +4,8 @@ import (
|
|
|
|
|
"bytes"
|
|
|
|
|
"encoding/gob"
|
|
|
|
|
"fmt"
|
|
|
|
|
"strconv"
|
|
|
|
|
"strings"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/Masterminds/semver/v3"
|
|
|
|
@ -27,10 +29,11 @@ type postsEntry struct {
|
|
|
|
|
type bigCacheIndexServant struct {
|
|
|
|
|
ips core.IndexPostsService
|
|
|
|
|
|
|
|
|
|
indexActionCh chan core.IndexActionT
|
|
|
|
|
indexActionCh chan *core.IndexAction
|
|
|
|
|
cachePostsCh chan *postsEntry
|
|
|
|
|
cache *bigcache.BigCache
|
|
|
|
|
lastCacheResetTime time.Time
|
|
|
|
|
preventDuration time.Duration
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *bigCacheIndexServant) IndexPosts(user *model.User, offset int, limit int) (*rest.IndexTweetsResp, error) {
|
|
|
|
@ -99,15 +102,16 @@ func (s *bigCacheIndexServant) keyFrom(user *model.User, offset int, limit int)
|
|
|
|
|
return fmt.Sprintf("index:%d:%d:%d", userId, offset, limit)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *bigCacheIndexServant) SendAction(act core.IndexActionT) {
|
|
|
|
|
func (s *bigCacheIndexServant) SendAction(act core.IdxAct, post *model.Post) {
|
|
|
|
|
action := core.NewIndexAction(act, post)
|
|
|
|
|
select {
|
|
|
|
|
case s.indexActionCh <- act:
|
|
|
|
|
case s.indexActionCh <- action:
|
|
|
|
|
logrus.Debugf("bigCacheIndexServant.SendAction send indexAction by chan: %s", act)
|
|
|
|
|
default:
|
|
|
|
|
go func(ch chan<- core.IndexActionT, act core.IndexActionT) {
|
|
|
|
|
logrus.Debugf("bigCacheIndexServant.SendAction send indexAction by goroutine: %s", act)
|
|
|
|
|
go func(ch chan<- *core.IndexAction, act *core.IndexAction) {
|
|
|
|
|
logrus.Debugf("bigCacheIndexServant.SendAction send indexAction by goroutine: %s", action.Act)
|
|
|
|
|
ch <- act
|
|
|
|
|
}(s.indexActionCh, act)
|
|
|
|
|
}(s.indexActionCh, action)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -117,30 +121,64 @@ func (s *bigCacheIndexServant) startIndexPosts() {
|
|
|
|
|
case entry := <-s.cachePostsCh:
|
|
|
|
|
s.setPosts(entry)
|
|
|
|
|
case action := <-s.indexActionCh:
|
|
|
|
|
switch action {
|
|
|
|
|
// TODO: 这里列出来是因为后续可能会精细化处理每种情况
|
|
|
|
|
case core.IdxActCreatePost,
|
|
|
|
|
core.IdxActUpdatePost,
|
|
|
|
|
core.IdxActDeletePost,
|
|
|
|
|
core.IdxActStickPost,
|
|
|
|
|
core.IdxActVisiblePost:
|
|
|
|
|
// TODO: 粗糙处理cache,后续需要针对每一种情况精细化处理
|
|
|
|
|
if time.Since(s.lastCacheResetTime) > time.Minute {
|
|
|
|
|
s.handleIndexAction(action)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *bigCacheIndexServant) handleIndexAction(action *core.IndexAction) {
|
|
|
|
|
act, post := action.Act, action.Post
|
|
|
|
|
|
|
|
|
|
// 创建/删除 私密推文特殊处理
|
|
|
|
|
switch act {
|
|
|
|
|
case core.IdxActCreatePost, core.IdxActDeletePost:
|
|
|
|
|
if post.Visibility == model.PostVisitPrivate {
|
|
|
|
|
s.deleteCacheByUserId(post.UserID)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 如果在s.preventDuration时间内就清除所有缓存,否则只清除自个儿的缓存
|
|
|
|
|
// TODO: 需要优化只清除受影响的缓存,后续完善
|
|
|
|
|
if time.Since(s.lastCacheResetTime) > s.preventDuration {
|
|
|
|
|
s.cache.Reset()
|
|
|
|
|
s.lastCacheResetTime = time.Now()
|
|
|
|
|
logrus.Debugf("reset cache by %s", action)
|
|
|
|
|
logrus.Debugf("bigCacheIndexServant.handleIndexAction reset cache by %s", action.Act)
|
|
|
|
|
} else {
|
|
|
|
|
s.deleteCacheByUserId(post.UserID)
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
// nop
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *bigCacheIndexServant) deleteCacheByUserId(id int64) {
|
|
|
|
|
var keys []string
|
|
|
|
|
userId := strconv.FormatInt(id, 10)
|
|
|
|
|
|
|
|
|
|
// 获取需要删除缓存的key,目前是仅删除自个儿的缓存
|
|
|
|
|
for it := s.cache.Iterator(); it.SetNext(); {
|
|
|
|
|
entry, err := it.Value()
|
|
|
|
|
if err != nil {
|
|
|
|
|
logrus.Debugf("bigCacheIndexServant.deleteCacheByUserId usrId: %s err:%s", userId, err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
key := entry.Key()
|
|
|
|
|
keyParts := strings.Split(key, ":")
|
|
|
|
|
if len(keyParts) > 2 && keyParts[0] == "index" && keyParts[1] == userId {
|
|
|
|
|
keys = append(keys, key)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 执行删缓存
|
|
|
|
|
for _, k := range keys {
|
|
|
|
|
s.cache.Delete(k)
|
|
|
|
|
}
|
|
|
|
|
s.lastCacheResetTime = time.Now()
|
|
|
|
|
logrus.Debugf("bigCacheIndexServant.deleteCacheByUserId userId:%d", id)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *bigCacheIndexServant) Name() string {
|
|
|
|
|
return "BigCacheIndex"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *bigCacheIndexServant) Version() *semver.Version {
|
|
|
|
|
return semver.MustParse("v0.1.0")
|
|
|
|
|
return semver.MustParse("v0.2.0")
|
|
|
|
|
}
|
|
|
|
|