Merge pull request #115 from alimy/pr-bigcache

add BigCacheIndex for cache index posts
pull/116/head
Michael Li 3 years ago committed by GitHub
commit 8eb3628a1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -274,8 +274,9 @@ Usage of release/paopao-ce:
* 数据库: MySQL/Sqlite3/PostgreSQL
* 对象存储: AliOSS/MinIO/LocalOSS
`LocalOSS` 提供使用本地目录文件作为对象存储的功能,仅用于开发调试环境;
* 缓存: Redis/SimpleCacheIndex
`SimpleCacheIndex`提供 广场文章列表 的缓存功能;
* 缓存: Redis/SimpleCacheIndex/BigCacheIndex
`SimpleCacheIndex`提供简单的 广场推文列表 的缓存功能;
`BigCacheIndex` 使用[BigCache](https://github.com/allegro/bigcache)缓存 广场推文列表,缓存每个用户每一页,简单做到千人千面;
* 搜索: Zinc
* 日志: LoggerFile/LoggerZinc
`LoggerFile` 使用文件写日志;

@ -13,7 +13,7 @@ Server: # 服务设置
WriteTimeout: 60
Features:
Default: ["Base", "MySQL", "Option", "LocalOSS", "LoggerFile"]
Develop: ["Base", "MySQL", "Option", "Sms", "AliOSS", "LoggerZinc"]
Develop: ["Base", "MySQL", "BigCacheIndex", "Sms", "AliOSS", "LoggerZinc"]
Demo: ["Base", "MySQL", "Option", "Sms", "MinIO", "LoggerZinc"]
Slim: ["Base", "Sqlite3", "LocalOSS", "LoggerFile"]
Base: ["Zinc", "Redis", "Alipay",]
@ -31,6 +31,11 @@ SimpleCacheIndex: # 缓存泡泡广场消息流
CheckTickDuration: 60 # 循环自检查每多少秒一次
ExpireTickDuration: 300 # 每多少秒后强制过期缓存, 设置为0禁止强制使缓存过期
ActionQPS: 100 # 添加/删除/更新Post的QPS, 默认100范围设置[10, 10000]
BigCacheIndex: # 使用BigCache缓存泡泡广场消息流
MaxIndexPage: 1024 # 最大缓存页数必须是2^n, 代表最大同时缓存多少页数据
Verbose: False # 是否打印cache操作的log
ExpireInSecond: 300 # 多少秒(>0)后强制过期缓存
UpdateQPS: 100 # 添加/删除/更新Post的QPS, 默认100
LoggerFile: # 使用File写日志
SavePath: data/paopao-ce/logs
FileName: app

@ -3,8 +3,10 @@ module github.com/rocboss/paopao-ce
go 1.16
require (
github.com/Masterminds/semver/v3 v3.1.1
github.com/afocus/captcha v0.0.0-20191010092841-4bd1f21c8868
github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible
github.com/allegro/bigcache/v3 v3.0.2
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/disintegration/imaging v1.6.2

@ -82,7 +82,10 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible h1:9gWa46nstkJ9miBReJcN8Gq34cBFbzSpQZVVT9N09TM=
github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/allegro/bigcache/v3 v3.0.2 h1:AKZCw+5eAaVyNTBmI2fgyPVJhHkdWder3O9IrprcQfI=
github.com/allegro/bigcache/v3 v3.0.2/go.mod h1:aPyh7jEvrog9zAwx5N7+JUQX5dZTSGpxF1LAR4dr35I=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0=

@ -19,6 +19,7 @@ var (
ServerSetting *ServerSettingS
AppSetting *AppSettingS
SimpleCacheIndexSetting *SimpleCacheIndexSettingS
BigCacheIndexSetting *BigCacheIndexSettingS
SmsJuheSetting *SmsJuheSettings
AlipaySetting *AlipaySettingS
ZincSetting *ZincSettingS
@ -47,6 +48,7 @@ func setupSetting(suite []string, noDefault bool) error {
"App": &AppSetting,
"Server": &ServerSetting,
"SimpleCacheIndex": &SimpleCacheIndexSetting,
"BigCacheIndex": &BigCacheIndexSetting,
"Alipay": &AlipaySetting,
"SmsJuhe": &SmsJuheSetting,
"LoggerFile": &loggerFileSetting,
@ -70,6 +72,10 @@ func setupSetting(suite []string, noDefault bool) error {
JWTSetting.Expire *= time.Second
ServerSetting.ReadTimeout *= time.Second
ServerSetting.WriteTimeout *= time.Second
SimpleCacheIndexSetting.CheckTickDuration *= time.Second
SimpleCacheIndexSetting.ExpireTickDuration *= time.Second
BigCacheIndexSetting.ExpireInSecond *= time.Second
Mutex = &sync.Mutex{}
return nil
}

@ -49,11 +49,18 @@ type AppSettingS struct {
type SimpleCacheIndexSettingS struct {
MaxIndexSize int
CheckTickDuration int
ExpireTickDuration int
CheckTickDuration time.Duration
ExpireTickDuration time.Duration
ActionQPS int
}
type BigCacheIndexSettingS struct {
MaxIndexPage int
ExpireInSecond time.Duration
Verbose bool
UpdateQPS int
}
type AlipaySettingS struct {
AppID string
PrivateKey string

@ -32,6 +32,7 @@ func (a IndexActionT) String() string {
// CacheIndexService cache index service interface
type CacheIndexService interface {
VersionInfo
IndexPostsService
SendAction(active IndexActionT)
}

@ -0,0 +1,10 @@
package core
import (
"github.com/Masterminds/semver/v3"
)
type VersionInfo interface {
Name() string
Version() *semver.Version
}

@ -0,0 +1,157 @@
package dao
import (
"bytes"
"encoding/gob"
"fmt"
"time"
"github.com/Masterminds/semver/v3"
"github.com/allegro/bigcache/v3"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/sirupsen/logrus"
)
func newBigCacheIndexServant(getIndexPosts indexPostsFunc) *bigCacheIndexServant {
s := conf.BigCacheIndexSetting
config := bigcache.DefaultConfig(s.ExpireInSecond)
config.Shards = s.MaxIndexPage
config.Verbose = s.Verbose
config.MaxEntrySize = 10000
config.Logger = logrus.StandardLogger()
cache, err := bigcache.NewBigCache(config)
if err != nil {
logrus.Fatalf("initial bigCahceIndex failure by err: %v", err)
}
cacheIndex := &bigCacheIndexServant{
getIndexPosts: getIndexPosts,
cache: cache,
}
// indexActionCh capacity custom configure by conf.yaml need in [10, 10000]
// or re-compile source to adjust min/max capacity
capacity := s.UpdateQPS
if capacity < 10 {
capacity = 10
} else if capacity > 10000 {
capacity = 10000
}
cacheIndex.indexActionCh = make(chan core.IndexActionT, capacity)
cacheIndex.cachePostsCh = make(chan *postsEntry, capacity)
go cacheIndex.startIndexPosts()
return cacheIndex
}
func (s *bigCacheIndexServant) IndexPosts(userId int64, offset int, limit int) ([]*model.PostFormated, error) {
key := s.keyFrom(userId, offset, limit)
posts, err := s.getPosts(key)
if err == nil {
logrus.Debugf("get index posts from cache by key: %s userId: %d offset:%d limit:%d", key, userId, offset, limit)
return posts, nil
}
if posts, err = s.getIndexPosts(userId, offset, limit); err != nil {
return nil, err
}
logrus.Debugf("get index posts from database by userId: %d offset:%d limit:%d", userId, offset, limit)
s.cachePosts(key, posts)
return posts, nil
}
func (s *bigCacheIndexServant) getPosts(key string) ([]*model.PostFormated, error) {
data, err := s.cache.Get(key)
if err != nil {
logrus.Debugf("get posts by key: %s from cache err: %v", key, err)
return nil, err
}
buf := bytes.NewBuffer(data)
dec := gob.NewDecoder(buf)
var posts []*model.PostFormated
if err := dec.Decode(&posts); err != nil {
logrus.Debugf("get posts from cache in decode err: %v", err)
return nil, err
}
return posts, nil
}
func (s *bigCacheIndexServant) cachePosts(key string, posts []*model.PostFormated) {
entry := &postsEntry{key: key, posts: posts}
select {
case s.cachePostsCh <- entry:
logrus.Debugf("send indexAction by chan of key: %s", key)
default:
go func(ch chan<- *postsEntry, entry *postsEntry) {
logrus.Debugf("send indexAction by goroutine of key: %s", key)
ch <- entry
}(s.cachePostsCh, entry)
}
}
func (s *bigCacheIndexServant) setPosts(entry *postsEntry) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(entry.posts); err != nil {
logrus.Debugf("setPosts encode post entry err: %v", err)
return
}
if err := s.cache.Set(entry.key, buf.Bytes()); err != nil {
logrus.Debugf("setPosts set cache err: %v", err)
}
logrus.Debugf("setPosts set cache by key: %s", entry.key)
}
func (s *bigCacheIndexServant) keyFrom(userId int64, offset int, limit int) string {
return fmt.Sprintf("index:%d:%d:%d", userId, offset, limit)
}
func (s *bigCacheIndexServant) SendAction(act core.IndexActionT) {
select {
case s.indexActionCh <- act:
logrus.Debugf("send indexAction by chan: %s", act)
default:
go func(ch chan<- core.IndexActionT, act core.IndexActionT) {
logrus.Debugf("send indexAction by goroutine: %s", act)
ch <- act
}(s.indexActionCh, act)
}
}
func (s *bigCacheIndexServant) startIndexPosts() {
for {
select {
case entry := <-s.cachePostsCh:
s.setPosts(entry)
case action := <-s.indexActionCh:
switch action {
// TODO: 这里列出来是因为后续可能会精细化处理每种情况
case core.IdxActCreatePost,
core.IdxActUpdatePost,
core.IdxActDeletePost,
core.IdxActStickPost,
core.IdxActVisiblePost:
// TODO: 粗糙处理cache后续需要针对每一种情况精细化处理
if time.Since(s.lastCacheResetTime) > time.Minute {
s.cache.Reset()
s.lastCacheResetTime = time.Now()
logrus.Debugf("reset cache by %s", action)
}
default:
// nop
}
}
}
}
func (s *bigCacheIndexServant) Name() string {
return "BigCacheIndex"
}
func (s *bigCacheIndexServant) Version() *semver.Version {
return semver.MustParse("v0.1.0")
}

@ -4,6 +4,7 @@ import (
"errors"
"time"
"github.com/Masterminds/semver/v3"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
@ -20,13 +21,13 @@ func newSimpleCacheIndexServant(getIndexPosts indexPostsFunc) *simpleCacheIndexS
getIndexPosts: getIndexPosts,
maxIndexSize: s.MaxIndexSize,
indexPosts: make([]*model.PostFormated, 0),
checkTick: time.NewTicker(time.Duration(s.CheckTickDuration) * time.Second), // check whether need update index every 1 minute
checkTick: time.NewTicker(s.CheckTickDuration), // check whether need update index every 1 minute
expireIndexTick: time.NewTicker(time.Second),
}
// force expire index every ExpireTickDuration second
if s.ExpireTickDuration != 0 {
cacheIndex.expireIndexTick.Reset(time.Duration(s.CheckTickDuration) * time.Second)
cacheIndex.expireIndexTick.Reset(s.CheckTickDuration)
} else {
cacheIndex.expireIndexTick.Stop()
}
@ -110,3 +111,11 @@ func (s *simpleCacheIndexServant) startIndexPosts() {
}
}
}
func (s *simpleCacheIndexServant) Name() string {
return "SimpleCacheIndex"
}
func (s *simpleCacheIndexServant) Version() *semver.Version {
return semver.MustParse("v0.1.0")
}

@ -5,6 +5,7 @@ import (
"time"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/allegro/bigcache/v3"
"github.com/minio/minio-go/v7"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
@ -22,6 +23,7 @@ var (
_ core.ObjectStorageService = (*localossServant)(nil)
_ core.AttachmentCheckService = (*attachmentCheckServant)(nil)
_ core.CacheIndexService = (*simpleCacheIndexServant)(nil)
_ core.CacheIndexService = (*bigCacheIndexServant)(nil)
)
type dataServant struct {
@ -43,6 +45,18 @@ type simpleCacheIndexServant struct {
expireIndexTick *time.Ticker
}
type postsEntry struct {
key string
posts []*model.PostFormated
}
type bigCacheIndexServant struct {
getIndexPosts indexPostsFunc
indexActionCh chan core.IndexActionT
cachePostsCh chan *postsEntry
cache *bigcache.BigCache
lastCacheResetTime time.Time
}
type localossServant struct {
savePath string
domain string
@ -72,13 +86,19 @@ func NewDataService(engine *gorm.DB, zinc *zinc.ZincClient) core.DataService {
}
// initialize CacheIndex if needed
ds.useCacheIndex = true
if conf.CfgIf("SimpleCacheIndex") {
ds.useCacheIndex = true
ds.cacheIndex = newSimpleCacheIndexServant(ds.getIndexPosts)
} else if conf.CfgIf("BigCacheIndex") {
ds.cacheIndex = newBigCacheIndexServant(ds.getIndexPosts)
} else {
ds.useCacheIndex = false
}
if ds.useCacheIndex {
logrus.Infof("use cache index service by %s for version: %s", ds.cacheIndex.Name(), ds.cacheIndex.Version())
}
return ds
}

@ -9,7 +9,7 @@ import (
func (d *dataServant) IndexPosts(userId int64, offset int, limit int) ([]*model.PostFormated, error) {
if d.useCacheIndex {
if posts, err := d.cacheIndex.IndexPosts(userId, offset, limit); err == nil {
logrus.Debugln("get index posts from cached")
logrus.Debugf("get index posts from cached by userId: %d", userId)
return posts, nil
}
}
@ -57,6 +57,7 @@ func (d *dataServant) MergePosts(posts []*model.Post) ([]*model.PostFormated, er
}
// getIndexPosts _userId保留未来使用
// TODO: 未来可能根据userId查询广场推文列表简单做到不同用户的主页都是不同的
func (d *dataServant) getIndexPosts(_userId int64, offset int, limit int) ([]*model.PostFormated, error) {
posts, err := (&model.Post{}).List(d.engine, &model.ConditionsT{
"visibility IN ?": []model.PostVisibleT{model.PostVisitPublic, model.PostVisitFriend},

@ -22,9 +22,10 @@ func GetPostList(c *gin.Context) {
q.Type = "tag"
}
userId, _ := userIdFrom(c)
if q.Query == "" && q.Type == "search" {
offset, limit := app.GetPageOffset(c)
posts, err := service.GetIndexPosts(offset, limit)
posts, err := service.GetIndexPosts(userId, offset, limit)
if err != nil {
logrus.Errorf("service.GetPostList err: %v\n", err)
response.ToErrorResponse(errcode.GetPostsFailed)

@ -570,6 +570,13 @@ func userFrom(c *gin.Context) (*model.User, bool) {
user, ok := u.(*model.User)
return user, ok
}
logrus.Debugln("user not exist")
return nil, false
}
func userIdFrom(c *gin.Context) (int64, bool) {
if u, exists := c.Get("UID"); exists {
uid, ok := u.(int64)
return uid, ok
}
return -1, false
}

@ -55,9 +55,6 @@ func NewRouter() *gin.Engine {
// 无鉴权路由组
noAuthApi := r.Group("/")
{
// 获取广场流
noAuthApi.GET("/posts", api.GetPostList)
// 获取动态详情
noAuthApi.GET("/post", api.GetPost)
@ -74,6 +71,9 @@ func NewRouter() *gin.Engine {
// 宽松鉴权路由组
looseApi := r.Group("/").Use(middleware.JwtLoose())
{
// 获取广场流
looseApi.GET("/posts", api.GetPostList)
// 获取用户动态列表
looseApi.GET("/user/posts", api.GetUserPosts)
}

@ -420,8 +420,8 @@ func GetPostContentByID(id int64) (*model.PostContent, error) {
return ds.GetPostContentByID(id)
}
func GetIndexPosts(offset int, limit int) ([]*model.PostFormated, error) {
return ds.IndexPosts(0, offset, limit)
func GetIndexPosts(userId int64, offset int, limit int) ([]*model.PostFormated, error) {
return ds.IndexPosts(userId, offset, limit)
}
func GetPostList(req *PostListReq) ([]*model.PostFormated, error) {

Loading…
Cancel
Save