diff --git a/README.md b/README.md index 962af52d..09943391 100644 --- a/README.md +++ b/README.md @@ -274,8 +274,9 @@ Usage of release/paopao-ce: * 数据库: MySQL/Sqlite3/PostgreSQL * 对象存储: AliOSS/MinIO/LocalOSS `LocalOSS` 提供使用本地目录文件作为对象存储的功能,仅用于开发调试环境; -* 缓存: Redis/SimpleCacheIndex - `SimpleCacheIndex`提供 广场文章列表 的缓存功能; +* 缓存: Redis/SimpleCacheIndex/BigCacheIndex + `SimpleCacheIndex`提供简单的 广场推文列表 的缓存功能; + `BigCacheIndex` 使用[BigCache](https://github.com/allegro/bigcache)缓存 广场推文列表,缓存每个用户每一页,简单做到千人千面; * 搜索: Zinc * 日志: LoggerFile/LoggerZinc `LoggerFile` 使用文件写日志; diff --git a/config.yaml.sample b/config.yaml.sample index 524cf9df..38fa5a8b 100644 --- a/config.yaml.sample +++ b/config.yaml.sample @@ -13,7 +13,7 @@ Server: # 服务设置 WriteTimeout: 60 Features: Default: ["Base", "MySQL", "Option", "LocalOSS", "LoggerFile"] - Develop: ["Base", "MySQL", "Option", "Sms", "AliOSS", "LoggerZinc"] + Develop: ["Base", "MySQL", "BigCacheIndex", "Sms", "AliOSS", "LoggerZinc"] Demo: ["Base", "MySQL", "Option", "Sms", "MinIO", "LoggerZinc"] Slim: ["Base", "Sqlite3", "LocalOSS", "LoggerFile"] Base: ["Zinc", "Redis", "Alipay",] @@ -31,6 +31,11 @@ SimpleCacheIndex: # 缓存泡泡广场消息流 CheckTickDuration: 60 # 循环自检查每多少秒一次 ExpireTickDuration: 300 # 每多少秒后强制过期缓存, 设置为0禁止强制使缓存过期 ActionQPS: 100 # 添加/删除/更新Post的QPS, 默认100,范围设置[10, 10000] +BigCacheIndex: # 使用BigCache缓存泡泡广场消息流 + MaxIndexPage: 1024 # 最大缓存页数,必须是2^n, 代表最大同时缓存多少页数据 + Verbose: False # 是否打印cache操作的log + ExpireInSecond: 300 # 多少秒(>0)后强制过期缓存 + UpdateQPS: 100 # 添加/删除/更新Post的QPS, 默认100 LoggerFile: # 使用File写日志 SavePath: data/paopao-ce/logs FileName: app diff --git a/go.mod b/go.mod index de091c49..b0b7b976 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,10 @@ module github.com/rocboss/paopao-ce go 1.16 require ( + github.com/Masterminds/semver/v3 v3.1.1 github.com/afocus/captcha v0.0.0-20191010092841-4bd1f21c8868 github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible + github.com/allegro/bigcache/v3 v3.0.2 github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/disintegration/imaging v1.6.2 diff --git a/go.sum b/go.sum index 29e4614b..904a427a 100644 --- a/go.sum +++ b/go.sum @@ -82,7 +82,10 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible h1:9gWa46nstkJ9miBReJcN8Gq34cBFbzSpQZVVT9N09TM= github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= +github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= +github.com/allegro/bigcache/v3 v3.0.2 h1:AKZCw+5eAaVyNTBmI2fgyPVJhHkdWder3O9IrprcQfI= +github.com/allegro/bigcache/v3 v3.0.2/go.mod h1:aPyh7jEvrog9zAwx5N7+JUQX5dZTSGpxF1LAR4dr35I= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0= diff --git a/internal/conf/conf.go b/internal/conf/conf.go index 77b0c6bd..2e44f662 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -19,6 +19,7 @@ var ( ServerSetting *ServerSettingS AppSetting *AppSettingS SimpleCacheIndexSetting *SimpleCacheIndexSettingS + BigCacheIndexSetting *BigCacheIndexSettingS SmsJuheSetting *SmsJuheSettings AlipaySetting *AlipaySettingS ZincSetting *ZincSettingS @@ -47,6 +48,7 @@ func setupSetting(suite []string, noDefault bool) error { "App": &AppSetting, "Server": &ServerSetting, "SimpleCacheIndex": &SimpleCacheIndexSetting, + "BigCacheIndex": &BigCacheIndexSetting, "Alipay": &AlipaySetting, "SmsJuhe": &SmsJuheSetting, "LoggerFile": &loggerFileSetting, @@ -70,6 +72,10 @@ func setupSetting(suite []string, noDefault bool) error { JWTSetting.Expire *= time.Second ServerSetting.ReadTimeout *= time.Second ServerSetting.WriteTimeout *= time.Second + SimpleCacheIndexSetting.CheckTickDuration *= time.Second + SimpleCacheIndexSetting.ExpireTickDuration *= time.Second + BigCacheIndexSetting.ExpireInSecond *= time.Second + Mutex = &sync.Mutex{} return nil } diff --git a/internal/conf/settting.go b/internal/conf/settting.go index 33cb9131..182b43a5 100644 --- a/internal/conf/settting.go +++ b/internal/conf/settting.go @@ -49,11 +49,18 @@ type AppSettingS struct { type SimpleCacheIndexSettingS struct { MaxIndexSize int - CheckTickDuration int - ExpireTickDuration int + CheckTickDuration time.Duration + ExpireTickDuration time.Duration ActionQPS int } +type BigCacheIndexSettingS struct { + MaxIndexPage int + ExpireInSecond time.Duration + Verbose bool + UpdateQPS int +} + type AlipaySettingS struct { AppID string PrivateKey string diff --git a/internal/core/cache.go b/internal/core/cache.go index 07afbdb4..383f760e 100644 --- a/internal/core/cache.go +++ b/internal/core/cache.go @@ -32,6 +32,7 @@ func (a IndexActionT) String() string { // CacheIndexService cache index service interface type CacheIndexService interface { + VersionInfo IndexPostsService SendAction(active IndexActionT) } diff --git a/internal/core/version.go b/internal/core/version.go new file mode 100644 index 00000000..fd8bab2d --- /dev/null +++ b/internal/core/version.go @@ -0,0 +1,10 @@ +package core + +import ( + "github.com/Masterminds/semver/v3" +) + +type VersionInfo interface { + Name() string + Version() *semver.Version +} diff --git a/internal/dao/cache_index_big.go b/internal/dao/cache_index_big.go new file mode 100644 index 00000000..7b9615a9 --- /dev/null +++ b/internal/dao/cache_index_big.go @@ -0,0 +1,157 @@ +package dao + +import ( + "bytes" + "encoding/gob" + "fmt" + "time" + + "github.com/Masterminds/semver/v3" + "github.com/allegro/bigcache/v3" + "github.com/rocboss/paopao-ce/internal/conf" + "github.com/rocboss/paopao-ce/internal/core" + "github.com/rocboss/paopao-ce/internal/model" + "github.com/sirupsen/logrus" +) + +func newBigCacheIndexServant(getIndexPosts indexPostsFunc) *bigCacheIndexServant { + s := conf.BigCacheIndexSetting + + config := bigcache.DefaultConfig(s.ExpireInSecond) + config.Shards = s.MaxIndexPage + config.Verbose = s.Verbose + config.MaxEntrySize = 10000 + config.Logger = logrus.StandardLogger() + cache, err := bigcache.NewBigCache(config) + if err != nil { + logrus.Fatalf("initial bigCahceIndex failure by err: %v", err) + } + + cacheIndex := &bigCacheIndexServant{ + getIndexPosts: getIndexPosts, + cache: cache, + } + + // indexActionCh capacity custom configure by conf.yaml need in [10, 10000] + // or re-compile source to adjust min/max capacity + capacity := s.UpdateQPS + if capacity < 10 { + capacity = 10 + } else if capacity > 10000 { + capacity = 10000 + } + cacheIndex.indexActionCh = make(chan core.IndexActionT, capacity) + cacheIndex.cachePostsCh = make(chan *postsEntry, capacity) + + go cacheIndex.startIndexPosts() + + return cacheIndex +} + +func (s *bigCacheIndexServant) IndexPosts(userId int64, offset int, limit int) ([]*model.PostFormated, error) { + key := s.keyFrom(userId, offset, limit) + posts, err := s.getPosts(key) + if err == nil { + logrus.Debugf("get index posts from cache by key: %s userId: %d offset:%d limit:%d", key, userId, offset, limit) + return posts, nil + } + + if posts, err = s.getIndexPosts(userId, offset, limit); err != nil { + return nil, err + } + logrus.Debugf("get index posts from database by userId: %d offset:%d limit:%d", userId, offset, limit) + s.cachePosts(key, posts) + return posts, nil +} + +func (s *bigCacheIndexServant) getPosts(key string) ([]*model.PostFormated, error) { + data, err := s.cache.Get(key) + if err != nil { + logrus.Debugf("get posts by key: %s from cache err: %v", key, err) + return nil, err + } + buf := bytes.NewBuffer(data) + dec := gob.NewDecoder(buf) + var posts []*model.PostFormated + if err := dec.Decode(&posts); err != nil { + logrus.Debugf("get posts from cache in decode err: %v", err) + return nil, err + } + return posts, nil +} + +func (s *bigCacheIndexServant) cachePosts(key string, posts []*model.PostFormated) { + entry := &postsEntry{key: key, posts: posts} + select { + case s.cachePostsCh <- entry: + logrus.Debugf("send indexAction by chan of key: %s", key) + default: + go func(ch chan<- *postsEntry, entry *postsEntry) { + logrus.Debugf("send indexAction by goroutine of key: %s", key) + ch <- entry + }(s.cachePostsCh, entry) + } +} + +func (s *bigCacheIndexServant) setPosts(entry *postsEntry) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + if err := enc.Encode(entry.posts); err != nil { + logrus.Debugf("setPosts encode post entry err: %v", err) + return + } + if err := s.cache.Set(entry.key, buf.Bytes()); err != nil { + logrus.Debugf("setPosts set cache err: %v", err) + } + logrus.Debugf("setPosts set cache by key: %s", entry.key) +} + +func (s *bigCacheIndexServant) keyFrom(userId int64, offset int, limit int) string { + return fmt.Sprintf("index:%d:%d:%d", userId, offset, limit) +} + +func (s *bigCacheIndexServant) SendAction(act core.IndexActionT) { + select { + case s.indexActionCh <- act: + logrus.Debugf("send indexAction by chan: %s", act) + default: + go func(ch chan<- core.IndexActionT, act core.IndexActionT) { + logrus.Debugf("send indexAction by goroutine: %s", act) + ch <- act + }(s.indexActionCh, act) + } +} + +func (s *bigCacheIndexServant) startIndexPosts() { + for { + select { + case entry := <-s.cachePostsCh: + s.setPosts(entry) + case action := <-s.indexActionCh: + switch action { + // TODO: 这里列出来是因为后续可能会精细化处理每种情况 + case core.IdxActCreatePost, + core.IdxActUpdatePost, + core.IdxActDeletePost, + core.IdxActStickPost, + core.IdxActVisiblePost: + // TODO: 粗糙处理cache,后续需要针对每一种情况精细化处理 + if time.Since(s.lastCacheResetTime) > time.Minute { + s.cache.Reset() + s.lastCacheResetTime = time.Now() + logrus.Debugf("reset cache by %s", action) + } + default: + // nop + } + } + } +} + +func (s *bigCacheIndexServant) Name() string { + return "BigCacheIndex" +} + +func (s *bigCacheIndexServant) Version() *semver.Version { + return semver.MustParse("v0.1.0") +} diff --git a/internal/dao/cache_index.go b/internal/dao/cache_index_simple.go similarity index 88% rename from internal/dao/cache_index.go rename to internal/dao/cache_index_simple.go index adbd9fb2..f6c44b64 100644 --- a/internal/dao/cache_index.go +++ b/internal/dao/cache_index_simple.go @@ -4,6 +4,7 @@ import ( "errors" "time" + "github.com/Masterminds/semver/v3" "github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/model" @@ -20,13 +21,13 @@ func newSimpleCacheIndexServant(getIndexPosts indexPostsFunc) *simpleCacheIndexS getIndexPosts: getIndexPosts, maxIndexSize: s.MaxIndexSize, indexPosts: make([]*model.PostFormated, 0), - checkTick: time.NewTicker(time.Duration(s.CheckTickDuration) * time.Second), // check whether need update index every 1 minute + checkTick: time.NewTicker(s.CheckTickDuration), // check whether need update index every 1 minute expireIndexTick: time.NewTicker(time.Second), } // force expire index every ExpireTickDuration second if s.ExpireTickDuration != 0 { - cacheIndex.expireIndexTick.Reset(time.Duration(s.CheckTickDuration) * time.Second) + cacheIndex.expireIndexTick.Reset(s.CheckTickDuration) } else { cacheIndex.expireIndexTick.Stop() } @@ -110,3 +111,11 @@ func (s *simpleCacheIndexServant) startIndexPosts() { } } } + +func (s *simpleCacheIndexServant) Name() string { + return "SimpleCacheIndex" +} + +func (s *simpleCacheIndexServant) Version() *semver.Version { + return semver.MustParse("v0.1.0") +} diff --git a/internal/dao/dao.go b/internal/dao/dao.go index fb82b71c..1c1f9886 100644 --- a/internal/dao/dao.go +++ b/internal/dao/dao.go @@ -5,6 +5,7 @@ import ( "time" "github.com/aliyun/aliyun-oss-go-sdk/oss" + "github.com/allegro/bigcache/v3" "github.com/minio/minio-go/v7" "github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/core" @@ -22,6 +23,7 @@ var ( _ core.ObjectStorageService = (*localossServant)(nil) _ core.AttachmentCheckService = (*attachmentCheckServant)(nil) _ core.CacheIndexService = (*simpleCacheIndexServant)(nil) + _ core.CacheIndexService = (*bigCacheIndexServant)(nil) ) type dataServant struct { @@ -43,6 +45,18 @@ type simpleCacheIndexServant struct { expireIndexTick *time.Ticker } +type postsEntry struct { + key string + posts []*model.PostFormated +} +type bigCacheIndexServant struct { + getIndexPosts indexPostsFunc + indexActionCh chan core.IndexActionT + cachePostsCh chan *postsEntry + cache *bigcache.BigCache + lastCacheResetTime time.Time +} + type localossServant struct { savePath string domain string @@ -72,13 +86,19 @@ func NewDataService(engine *gorm.DB, zinc *zinc.ZincClient) core.DataService { } // initialize CacheIndex if needed + ds.useCacheIndex = true if conf.CfgIf("SimpleCacheIndex") { - ds.useCacheIndex = true ds.cacheIndex = newSimpleCacheIndexServant(ds.getIndexPosts) + } else if conf.CfgIf("BigCacheIndex") { + ds.cacheIndex = newBigCacheIndexServant(ds.getIndexPosts) } else { ds.useCacheIndex = false } + if ds.useCacheIndex { + logrus.Infof("use cache index service by %s for version: %s", ds.cacheIndex.Name(), ds.cacheIndex.Version()) + } + return ds } diff --git a/internal/dao/post_index.go b/internal/dao/post_index.go index 4353e938..4802f1c8 100644 --- a/internal/dao/post_index.go +++ b/internal/dao/post_index.go @@ -9,7 +9,7 @@ import ( func (d *dataServant) IndexPosts(userId int64, offset int, limit int) ([]*model.PostFormated, error) { if d.useCacheIndex { if posts, err := d.cacheIndex.IndexPosts(userId, offset, limit); err == nil { - logrus.Debugln("get index posts from cached") + logrus.Debugf("get index posts from cached by userId: %d", userId) return posts, nil } } @@ -57,6 +57,7 @@ func (d *dataServant) MergePosts(posts []*model.Post) ([]*model.PostFormated, er } // getIndexPosts _userId保留未来使用 +// TODO: 未来可能根据userId查询广场推文列表,简单做到不同用户的主页都是不同的; func (d *dataServant) getIndexPosts(_userId int64, offset int, limit int) ([]*model.PostFormated, error) { posts, err := (&model.Post{}).List(d.engine, &model.ConditionsT{ "visibility IN ?": []model.PostVisibleT{model.PostVisitPublic, model.PostVisitFriend}, diff --git a/internal/routers/api/post.go b/internal/routers/api/post.go index af352afb..0f6f62a8 100644 --- a/internal/routers/api/post.go +++ b/internal/routers/api/post.go @@ -22,9 +22,10 @@ func GetPostList(c *gin.Context) { q.Type = "tag" } + userId, _ := userIdFrom(c) if q.Query == "" && q.Type == "search" { offset, limit := app.GetPageOffset(c) - posts, err := service.GetIndexPosts(offset, limit) + posts, err := service.GetIndexPosts(userId, offset, limit) if err != nil { logrus.Errorf("service.GetPostList err: %v\n", err) response.ToErrorResponse(errcode.GetPostsFailed) diff --git a/internal/routers/api/user.go b/internal/routers/api/user.go index 9529a805..1b4e1807 100644 --- a/internal/routers/api/user.go +++ b/internal/routers/api/user.go @@ -570,6 +570,13 @@ func userFrom(c *gin.Context) (*model.User, bool) { user, ok := u.(*model.User) return user, ok } - logrus.Debugln("user not exist") return nil, false } + +func userIdFrom(c *gin.Context) (int64, bool) { + if u, exists := c.Get("UID"); exists { + uid, ok := u.(int64) + return uid, ok + } + return -1, false +} diff --git a/internal/routers/router.go b/internal/routers/router.go index 54ffb90d..80012733 100644 --- a/internal/routers/router.go +++ b/internal/routers/router.go @@ -55,9 +55,6 @@ func NewRouter() *gin.Engine { // 无鉴权路由组 noAuthApi := r.Group("/") { - // 获取广场流 - noAuthApi.GET("/posts", api.GetPostList) - // 获取动态详情 noAuthApi.GET("/post", api.GetPost) @@ -74,6 +71,9 @@ func NewRouter() *gin.Engine { // 宽松鉴权路由组 looseApi := r.Group("/").Use(middleware.JwtLoose()) { + // 获取广场流 + looseApi.GET("/posts", api.GetPostList) + // 获取用户动态列表 looseApi.GET("/user/posts", api.GetUserPosts) } diff --git a/internal/service/post.go b/internal/service/post.go index c2193eee..0f3cf28f 100644 --- a/internal/service/post.go +++ b/internal/service/post.go @@ -420,8 +420,8 @@ func GetPostContentByID(id int64) (*model.PostContent, error) { return ds.GetPostContentByID(id) } -func GetIndexPosts(offset int, limit int) ([]*model.PostFormated, error) { - return ds.IndexPosts(0, offset, limit) +func GetIndexPosts(userId int64, offset int, limit int) ([]*model.PostFormated, error) { + return ds.IndexPosts(userId, offset, limit) } func GetPostList(req *PostListReq) ([]*model.PostFormated, error) {