You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
paopao-ce/internal/dao/cache/base.go

231 lines
6.4 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package cache
import (
"bytes"
"encoding/gob"
"fmt"
"strconv"
"strings"
"time"
"github.com/Masterminds/semver/v3"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/core/ms"
"github.com/rocboss/paopao-ce/pkg/types"
"github.com/sirupsen/logrus"
)
const (
_cacheIndexKey = "paopao_index"
)
var (
_ core.CacheIndexService = (*cacheIndexSrv)(nil)
_ core.VersionInfo = (*cacheIndexSrv)(nil)
)
type postsEntry struct {
key string
tweets *ms.IndexTweetList
}
type tweetsCache interface {
core.VersionInfo
getTweetsBytes(key string) ([]byte, error)
setTweetsBytes(key string, bs []byte) error
delTweets(keys []string) error
allKeys() ([]string, error)
}
type cacheIndexSrv struct {
ips core.IndexPostsService
ams core.AuthorizationManageService
name string
version *semver.Version
indexActionCh chan *core.IndexAction
cachePostsCh chan *postsEntry
cache tweetsCache
lastCacheResetTime time.Time
preventDuration time.Duration
}
func (s *cacheIndexSrv) IndexPosts(user *ms.User, offset int, limit int) (*ms.IndexTweetList, error) {
key := s.keyFrom(user, offset, limit)
posts, err := s.getPosts(key)
if err == nil {
logrus.Debugf("cacheIndexSrv.IndexPosts get index posts from cache by key: %s", key)
return posts, nil
}
if posts, err = s.ips.IndexPosts(user, offset, limit); err != nil {
return nil, err
}
logrus.Debugf("cacheIndexSrv.IndexPosts get index posts from database by key: %s", key)
s.cachePosts(key, posts)
return posts, nil
}
func (s *cacheIndexSrv) getPosts(key string) (*ms.IndexTweetList, error) {
data, err := s.cache.getTweetsBytes(key)
if err != nil {
logrus.Debugf("cacheIndexSrv.getPosts get posts by key: %s from cache err: %v", key, err)
return nil, err
}
buf := bytes.NewBuffer(data)
dec := gob.NewDecoder(buf)
var resp ms.IndexTweetList
if err := dec.Decode(&resp); err != nil {
logrus.Debugf("cacheIndexSrv.getPosts get posts from cache in decode err: %v", err)
return nil, err
}
return &resp, nil
}
func (s *cacheIndexSrv) cachePosts(key string, tweets *ms.IndexTweetList) {
entry := &postsEntry{key: key, tweets: tweets}
select {
case s.cachePostsCh <- entry:
logrus.Debugf("cacheIndexSrv.cachePosts cachePosts by chan of key: %s", key)
default:
go func(ch chan<- *postsEntry, entry *postsEntry) {
logrus.Debugf("cacheIndexSrv.cachePosts cachePosts indexAction by goroutine of key: %s", key)
ch <- entry
}(s.cachePostsCh, entry)
}
}
func (s *cacheIndexSrv) setPosts(entry *postsEntry) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(entry.tweets); err != nil {
logrus.Debugf("cacheIndexSrv.setPosts setPosts encode post entry err: %v", err)
return
}
if err := s.cache.setTweetsBytes(entry.key, buf.Bytes()); err != nil {
logrus.Debugf("cacheIndexSrv.setPosts setPosts set cache err: %v", err)
}
logrus.Debugf("cacheIndexSrv.setPosts setPosts set cache by key: %s", entry.key)
}
func (s *cacheIndexSrv) keyFrom(user *ms.User, offset int, limit int) string {
var userId int64 = -1
if user != nil {
userId = user.ID
}
return fmt.Sprintf("%s:%d:%d:%d", _cacheIndexKey, userId, offset, limit)
}
func (s *cacheIndexSrv) SendAction(act core.IdxAct, post *ms.Post) {
action := core.NewIndexAction(act, post)
select {
case s.indexActionCh <- action:
logrus.Debugf("cacheIndexSrv.SendAction send indexAction by chan: %s", act)
default:
go func(ch chan<- *core.IndexAction, act *core.IndexAction) {
logrus.Debugf("cacheIndexSrv.SendAction send indexAction by goroutine: %s", action.Act)
ch <- act
}(s.indexActionCh, action)
}
}
func (s *cacheIndexSrv) startIndexPosts() {
for {
select {
case entry := <-s.cachePostsCh:
s.setPosts(entry)
case action := <-s.indexActionCh:
s.handleIndexAction(action)
}
}
}
func (s *cacheIndexSrv) handleIndexAction(action *core.IndexAction) {
act, post := action.Act, action.Post
// 创建/删除 私密推文特殊处理
switch act {
case core.IdxActCreatePost, core.IdxActDeletePost:
if post.Visibility == core.PostVisitPrivate {
s.deleteCacheByUserId(post.UserID, true)
return
}
}
// 如果在s.preventDuration时间内就清除所有缓存否则只清除自个儿的缓存
// TODO: 需要优化只清除受影响的缓存,后续完善
if time.Since(s.lastCacheResetTime) > s.preventDuration {
s.deleteCacheByUserId(post.UserID, false)
} else {
s.deleteCacheByUserId(post.UserID, true)
}
}
func (s *cacheIndexSrv) deleteCacheByUserId(id int64, oneself bool) {
var keys []string
userId := strconv.FormatInt(id, 10)
friendSet := ms.FriendSet{}
if !oneself {
friendSet = s.ams.MyFriendSet(id)
}
friendSet[userId] = types.Empty{}
// 获取需要删除缓存的key目前是仅删除自个儿的缓存
allKeys, err := s.cache.allKeys()
if err != nil {
logrus.Debugf("cacheIndexSrv.deleteCacheByUserId userId: %s err:%s", userId, err)
}
for _, key := range allKeys {
keyParts := strings.Split(key, ":")
if len(keyParts) > 2 && keyParts[0] == _cacheIndexKey {
if _, ok := friendSet[keyParts[1]]; ok {
keys = append(keys, key)
}
}
}
// 执行删缓存
s.cache.delTweets(keys)
s.lastCacheResetTime = time.Now()
logrus.Debugf("cacheIndexSrv.deleteCacheByUserId userId:%s oneself:%t keys:%d", userId, oneself, len(keys))
}
func (s *cacheIndexSrv) Name() string {
return s.name
}
func (s *cacheIndexSrv) Version() *semver.Version {
return s.version
}
func newCacheIndexSrv(ips core.IndexPostsService, ams core.AuthorizationManageService, tc tweetsCache) *cacheIndexSrv {
cacheIndex := &cacheIndexSrv{
ips: ips,
ams: ams,
cache: tc,
name: tc.Name(),
version: tc.Version(),
preventDuration: 10 * time.Second,
}
// indexActionCh capacity custom configure by conf.yaml need in [10, 10000]
// or re-compile source to adjust min/max capacity
capacity := conf.CacheIndexSetting.MaxUpdateQPS
if capacity < 10 {
capacity = 10
} else if capacity > 10000 {
capacity = 10000
}
cacheIndex.indexActionCh = make(chan *core.IndexAction, capacity)
cacheIndex.cachePostsCh = make(chan *postsEntry, capacity)
// 启动索引更新器
go cacheIndex.startIndexPosts()
return cacheIndex
}