From d23d40fa9d6a61fd220dfee69311c6f5db1cb463 Mon Sep 17 00:00:00 2001 From: Michael Li Date: Sun, 2 Apr 2023 19:17:32 +0800 Subject: [PATCH 1/2] update CHANGELOG.md --- CHANGELOG.md | 22 ++++++++++++++++++++++ ROADMAP.md | 14 ++++++++++++-- internal/dao/cache/redis.go | 4 ++++ 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 709d4457..52bb58da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,28 @@ All notable changes to paopao-ce are documented in this file. ## 0.3.0+dev ([`dev`](https://github.com/rocboss/paopao-ce/tree/dev)) +### Changed + +- use [github.com/rueian/rueidis](https://github.com/rueian/rueidis) as Redis client [#249](https://github.com/rocboss/paopao-ce/pull/249) + the **Old** redis client configure field + ```yaml + ... + Redis: + Host: redis:6379 + Password: + DB: + ``` + the **New** redis client configure field + ```yaml + ... + Redis: + InitAddress: + - redis:6379 + Username: + Password: + SelectDB: + ConnWriteTimeout: 60 # 连接写超时时间 多少秒 默认 60秒 + ``` ## 0.2.3 diff --git a/ROADMAP.md b/ROADMAP.md index 7ef7c15b..66e918f5 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -9,12 +9,12 @@ * [ ] add extend base ORM code for implement data logic base sqlx/sqlc * [ ] optimize media tweet submit logic * [ ] optimize search logic service -* [ ] remove `Deprecated:OldWeb` feature - +* [x] remove `Deprecated:OldWeb` feature #### v0.2.0 * [x] add `Friendship` feature * [x] add `Lightship` feature +* [ ] add extend base ORM code for implement data logic base sqlx/sqlc * [x] add `Pyroscope` feature * [x] add new `Web` service * [x] add `Frontend:Web` feature @@ -42,6 +42,15 @@ * [ ] optimize topics service * [ ] optimize backend data logic service(optimize database CRUD operate) +<<<<<<< HEAD +### paopao-plus roadmap +#### v0.3.0 +* [ ] adapt for paopao-ce v0.3.0 + +### paopao-pro roadmap +#### v0.3.0 +* [ ] adapt for paopao-ce v0.3.0 +======= ## paopao-ce-plus roadmap #### paopao-ce-plus/v0.3.0 * [ ] adapt for paopao-ce v0.3.0 @@ -55,3 +64,4 @@ #### paopao-ce-pro/v0.2.0 * [ ] adapt for paopao-ce v0.2.0 +>>>>>>> x/sqlc diff --git a/internal/dao/cache/redis.go b/internal/dao/cache/redis.go index 1b27b77d..2d626d46 100644 --- a/internal/dao/cache/redis.go +++ b/internal/dao/cache/redis.go @@ -1,3 +1,7 @@ +// Copyright 2023 ROC. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + package cache import ( From 5cb30bb949bb823726bd3fe75cdec37327a056ee Mon Sep 17 00:00:00 2001 From: Michael Li Date: Mon, 3 Apr 2023 07:36:50 +0800 Subject: [PATCH 2/2] add RedisCacheIndex feature to support use Redis as Timeline Index cache --- CHANGELOG.md | 4 + README.md | 1 + ROADMAP.md | 1 + features-status.md | 4 + internal/conf/conf.go | 3 + internal/conf/config.yaml | 3 + internal/conf/settting.go | 5 + internal/dao/cache/base.go | 229 +++++++++++++++++++++++++++++++++ internal/dao/cache/bigcache.go | 178 +++---------------------- internal/dao/cache/cache.go | 35 ++--- internal/dao/cache/redis.go | 53 +++++++- internal/dao/jinzhu/jinzhu.go | 25 ++-- 12 files changed, 346 insertions(+), 195 deletions(-) create mode 100644 internal/dao/cache/base.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 52bb58da..e37ea238 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,10 @@ All notable changes to paopao-ce are documented in this file. ## 0.3.0+dev ([`dev`](https://github.com/rocboss/paopao-ce/tree/dev)) + +### Added +- add `RedisCacheIndex` feature [#250](https://github.com/rocboss/paopao-ce/pull/250) + ### Changed - use [github.com/rueian/rueidis](https://github.com/rueian/rueidis) as Redis client [#249](https://github.com/rocboss/paopao-ce/pull/249) diff --git a/README.md b/README.md index 2df383c4..efc60c19 100644 --- a/README.md +++ b/README.md @@ -360,6 +360,7 @@ release/paopao-ce --no-default-features --features sqlite3,localoss,loggerfile,r |`Redis` | 缓存 | 稳定 | Redis缓存功能 | |`SimpleCacheIndex` | 缓存 | Deprecated | 提供简单的 广场推文列表 的缓存功能 | |`BigCacheIndex` | 缓存 | 稳定(推荐) | 使用[BigCache](https://github.com/allegro/bigcache)缓存 广场推文列表,缓存每个用户每一页,简单做到千人千面 | +|`RedisCacheIndex` | 缓存 | 内测(推荐) | 使用Redis缓存 广场推文列表,缓存每个用户每一页,简单做到千人千面 | |`Zinc` | 搜索 | 稳定(推荐) | 基于[Zinc](https://github.com/zinclabs/zinc)搜索引擎提供推文搜索服务 | |`Meili` | 搜索 | 稳定(推荐) | 基于[Meilisearch](https://github.com/meilisearch/meilisearch)搜索引擎提供推文搜索服务 | |`Bleve` | 搜索 | WIP | 基于[Bleve](https://github.com/blevesearch/bleve)搜索引擎提供推文搜索服务 | diff --git a/ROADMAP.md b/ROADMAP.md index 66e918f5..9ff44496 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -6,6 +6,7 @@ * [ ] add `Followship` feature * [ ] add `Auth:Bcrypt` feature * [ ] add `Auth:MD5` feature (just for compatible) +* [x] add `RedisCacheIndex` feature * [ ] add extend base ORM code for implement data logic base sqlx/sqlc * [ ] optimize media tweet submit logic * [ ] optimize search logic service diff --git a/features-status.md b/features-status.md index 132121da..a8739f0d 100644 --- a/features-status.md +++ b/features-status.md @@ -105,6 +105,10 @@ * [ ] 提按文档 * [x] 接口定义 * [x] 业务逻辑实现 +* `RedisCacheIndex` 使用Redis缓存 广场推文列表,缓存每个用户每一页,简单做到千人千面(目前状态: 内测阶段,推荐使用); + * [ ] 提按文档 + * [x] 接口定义 + * [x] 业务逻辑实现 #### 搜索: * `Zinc` 基于[Zinc](https://github.com/zinclabs/zinc)搜索引擎提供推文搜索服务(目前状态: 稳定,推荐使用); diff --git a/internal/conf/conf.go b/internal/conf/conf.go index 5a2b0a1f..072e91c6 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -37,6 +37,7 @@ var ( CacheIndexSetting *CacheIndexSettingS SimpleCacheIndexSetting *SimpleCacheIndexSettingS BigCacheIndexSetting *BigCacheIndexSettingS + RedisCacheIndexSetting *RedisCacheIndexSettingS SmsJuheSetting *SmsJuheSettings AlipaySetting *AlipaySettingS TweetSearchSetting *TweetSearchS @@ -80,6 +81,7 @@ func setupSetting(suite []string, noDefault bool) error { "CacheIndex": &CacheIndexSetting, "SimpleCacheIndex": &SimpleCacheIndexSetting, "BigCacheIndex": &BigCacheIndexSetting, + "RedisCacheIndex": &RedisCacheIndexSetting, "Alipay": &AlipaySetting, "SmsJuhe": &SmsJuheSetting, "Pyroscope": &PyroscopeSetting, @@ -112,6 +114,7 @@ func setupSetting(suite []string, noDefault bool) error { SimpleCacheIndexSetting.CheckTickDuration *= time.Second SimpleCacheIndexSetting.ExpireTickDuration *= time.Second BigCacheIndexSetting.ExpireInSecond *= time.Second + RedisCacheIndexSetting.ExpireInSecond *= time.Second redisSetting.ConnWriteTimeout *= time.Second Mutex = &sync.Mutex{} diff --git a/internal/conf/config.yaml b/internal/conf/config.yaml index 71688934..b1e1f020 100644 --- a/internal/conf/config.yaml +++ b/internal/conf/config.yaml @@ -73,6 +73,9 @@ BigCacheIndex: # 使用BigCache缓存泡泡广场消息流 HardMaxCacheSize: 256 # 最大缓存大小(MB),0表示无限制 Verbose: False # 是否打印cache操作的log ExpireInSecond: 300 # 多少秒(>0)后强制过期缓存 +RedisCacheIndex: # 使用Redis缓存泡泡广场消息流 + Verbose: False # 是否打印cache操作的log + ExpireInSecond: 300 # 多少秒(>0)后强制过期缓存 Pyroscope: # Pyroscope配置 AppName: "paopao-ce" # application name Endpoint: "http://localhost:4040" # Pyroscope server address diff --git a/internal/conf/settting.go b/internal/conf/settting.go index 0ffda834..3766b51a 100644 --- a/internal/conf/settting.go +++ b/internal/conf/settting.go @@ -97,6 +97,11 @@ type BigCacheIndexSettingS struct { Verbose bool } +type RedisCacheIndexSettingS struct { + ExpireInSecond time.Duration + Verbose bool +} + type AlipaySettingS struct { AppID string PrivateKey string diff --git a/internal/dao/cache/base.go b/internal/dao/cache/base.go new file mode 100644 index 00000000..f550e126 --- /dev/null +++ b/internal/dao/cache/base.go @@ -0,0 +1,229 @@ +// Copyright 2023 ROC. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package cache + +import ( + "bytes" + "encoding/gob" + "fmt" + "strconv" + "strings" + "time" + + "github.com/Masterminds/semver/v3" + "github.com/rocboss/paopao-ce/internal/conf" + "github.com/rocboss/paopao-ce/internal/core" + "github.com/rocboss/paopao-ce/pkg/types" + "github.com/sirupsen/logrus" +) + +const ( + _cacheIndexKey = "paopao_index" +) + +var ( + _ core.CacheIndexService = (*cacheIndexSrv)(nil) + _ core.VersionInfo = (*cacheIndexSrv)(nil) +) + +type postsEntry struct { + key string + tweets *core.IndexTweetList +} + +type tweetsCache interface { + core.VersionInfo + getTweetsBytes(key string) ([]byte, error) + setTweetsBytes(key string, bs []byte) error + delTweets(keys []string) error + allKeys() ([]string, error) +} + +type cacheIndexSrv struct { + ips core.IndexPostsService + ams core.AuthorizationManageService + + name string + version *semver.Version + indexActionCh chan *core.IndexAction + cachePostsCh chan *postsEntry + cache tweetsCache + lastCacheResetTime time.Time + preventDuration time.Duration +} + +func (s *cacheIndexSrv) IndexPosts(user *core.User, offset int, limit int) (*core.IndexTweetList, error) { + key := s.keyFrom(user, offset, limit) + posts, err := s.getPosts(key) + if err == nil { + logrus.Debugf("cacheIndexSrv.IndexPosts get index posts from cache by key: %s", key) + return posts, nil + } + + if posts, err = s.ips.IndexPosts(user, offset, limit); err != nil { + return nil, err + } + logrus.Debugf("cacheIndexSrv.IndexPosts get index posts from database by key: %s", key) + s.cachePosts(key, posts) + return posts, nil +} + +func (s *cacheIndexSrv) getPosts(key string) (*core.IndexTweetList, error) { + data, err := s.cache.getTweetsBytes(key) + if err != nil { + logrus.Debugf("cacheIndexSrv.getPosts get posts by key: %s from cache err: %v", key, err) + return nil, err + } + buf := bytes.NewBuffer(data) + dec := gob.NewDecoder(buf) + var resp core.IndexTweetList + if err := dec.Decode(&resp); err != nil { + logrus.Debugf("cacheIndexSrv.getPosts get posts from cache in decode err: %v", err) + return nil, err + } + return &resp, nil +} + +func (s *cacheIndexSrv) cachePosts(key string, tweets *core.IndexTweetList) { + entry := &postsEntry{key: key, tweets: tweets} + select { + case s.cachePostsCh <- entry: + logrus.Debugf("cacheIndexSrv.cachePosts cachePosts by chan of key: %s", key) + default: + go func(ch chan<- *postsEntry, entry *postsEntry) { + logrus.Debugf("cacheIndexSrv.cachePosts cachePosts indexAction by goroutine of key: %s", key) + ch <- entry + }(s.cachePostsCh, entry) + } +} + +func (s *cacheIndexSrv) setPosts(entry *postsEntry) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + if err := enc.Encode(entry.tweets); err != nil { + logrus.Debugf("cacheIndexSrv.setPosts setPosts encode post entry err: %v", err) + return + } + if err := s.cache.setTweetsBytes(entry.key, buf.Bytes()); err != nil { + logrus.Debugf("cacheIndexSrv.setPosts setPosts set cache err: %v", err) + } + logrus.Debugf("cacheIndexSrv.setPosts setPosts set cache by key: %s", entry.key) +} + +func (s *cacheIndexSrv) keyFrom(user *core.User, offset int, limit int) string { + var userId int64 = -1 + if user != nil { + userId = user.ID + } + return fmt.Sprintf("%s:%d:%d:%d", _cacheIndexKey, userId, offset, limit) +} + +func (s *cacheIndexSrv) SendAction(act core.IdxAct, post *core.Post) { + action := core.NewIndexAction(act, post) + select { + case s.indexActionCh <- action: + logrus.Debugf("cacheIndexSrv.SendAction send indexAction by chan: %s", act) + default: + go func(ch chan<- *core.IndexAction, act *core.IndexAction) { + logrus.Debugf("cacheIndexSrv.SendAction send indexAction by goroutine: %s", action.Act) + ch <- act + }(s.indexActionCh, action) + } +} + +func (s *cacheIndexSrv) startIndexPosts() { + for { + select { + case entry := <-s.cachePostsCh: + s.setPosts(entry) + case action := <-s.indexActionCh: + s.handleIndexAction(action) + } + } +} + +func (s *cacheIndexSrv) handleIndexAction(action *core.IndexAction) { + act, post := action.Act, action.Post + + // 创建/删除 私密推文特殊处理 + switch act { + case core.IdxActCreatePost, core.IdxActDeletePost: + if post.Visibility == core.PostVisitPrivate { + s.deleteCacheByUserId(post.UserID, true) + return + } + } + + // 如果在s.preventDuration时间内就清除所有缓存,否则只清除自个儿的缓存 + // TODO: 需要优化只清除受影响的缓存,后续完善 + if time.Since(s.lastCacheResetTime) > s.preventDuration { + s.deleteCacheByUserId(post.UserID, false) + } else { + s.deleteCacheByUserId(post.UserID, true) + } +} + +func (s *cacheIndexSrv) deleteCacheByUserId(id int64, oneself bool) { + var keys []string + userId := strconv.FormatInt(id, 10) + friendSet := core.FriendSet{} + if !oneself { + friendSet = s.ams.MyFriendSet(id) + } + friendSet[userId] = types.Empty{} + + // 获取需要删除缓存的key,目前是仅删除自个儿的缓存 + allKeys, err := s.cache.allKeys() + if err != nil { + logrus.Debugf("cacheIndexSrv.deleteCacheByUserId userId: %s err:%s", userId, err) + } + for _, key := range allKeys { + keyParts := strings.Split(key, ":") + if len(keyParts) > 2 && keyParts[0] == "index" { + if _, ok := friendSet[keyParts[1]]; ok { + keys = append(keys, key) + } + } + } + + // 执行删缓存 + s.cache.delTweets(keys) + s.lastCacheResetTime = time.Now() + logrus.Debugf("cacheIndexSrv.deleteCacheByUserId userId:%s oneself:%t keys:%d", userId, oneself, len(keys)) +} + +func (s *cacheIndexSrv) Name() string { + return s.name +} + +func (s *cacheIndexSrv) Version() *semver.Version { + return s.version +} + +func newCacheIndexSrv(ips core.IndexPostsService, ams core.AuthorizationManageService, tc tweetsCache) *cacheIndexSrv { + cacheIndex := &cacheIndexSrv{ + ips: ips, + ams: ams, + cache: tc, + name: tc.Name(), + version: tc.Version(), + preventDuration: 10 * time.Second, + } + + // indexActionCh capacity custom configure by conf.yaml need in [10, 10000] + // or re-compile source to adjust min/max capacity + capacity := conf.CacheIndexSetting.MaxUpdateQPS + if capacity < 10 { + capacity = 10 + } else if capacity > 10000 { + capacity = 10000 + } + cacheIndex.indexActionCh = make(chan *core.IndexAction, capacity) + cacheIndex.cachePostsCh = make(chan *postsEntry, capacity) + // 启动索引更新器 + go cacheIndex.startIndexPosts() + + return cacheIndex +} diff --git a/internal/dao/cache/bigcache.go b/internal/dao/cache/bigcache.go index 8ec83c38..202627e3 100644 --- a/internal/dao/cache/bigcache.go +++ b/internal/dao/cache/bigcache.go @@ -5,189 +5,51 @@ package cache import ( - "bytes" - "encoding/gob" - "fmt" - "strconv" - "strings" - "time" - "github.com/Masterminds/semver/v3" "github.com/allegro/bigcache/v3" - "github.com/rocboss/paopao-ce/internal/core" - "github.com/rocboss/paopao-ce/pkg/types" - "github.com/sirupsen/logrus" ) var ( - _ core.CacheIndexService = (*bigCacheIndexServant)(nil) - _ core.VersionInfo = (*bigCacheIndexServant)(nil) + _ tweetsCache = (*bigCacheTweetsCache)(nil) ) -type postsEntry struct { - key string - tweets *core.IndexTweetList +type bigCacheTweetsCache struct { + name string + version *semver.Version + bc *bigcache.BigCache } -type bigCacheIndexServant struct { - ips core.IndexPostsService - ams core.AuthorizationManageService - - indexActionCh chan *core.IndexAction - cachePostsCh chan *postsEntry - cache *bigcache.BigCache - lastCacheResetTime time.Time - preventDuration time.Duration -} - -func (s *bigCacheIndexServant) IndexPosts(user *core.User, offset int, limit int) (*core.IndexTweetList, error) { - key := s.keyFrom(user, offset, limit) - posts, err := s.getPosts(key) - if err == nil { - logrus.Debugf("bigCacheIndexServant.IndexPosts get index posts from cache by key: %s", key) - return posts, nil - } - - if posts, err = s.ips.IndexPosts(user, offset, limit); err != nil { - return nil, err - } - logrus.Debugf("bigCacheIndexServant.IndexPosts get index posts from database by key: %s", key) - s.cachePosts(key, posts) - return posts, nil +func (s *bigCacheTweetsCache) getTweetsBytes(key string) ([]byte, error) { + return s.bc.Get(key) } -func (s *bigCacheIndexServant) getPosts(key string) (*core.IndexTweetList, error) { - data, err := s.cache.Get(key) - if err != nil { - logrus.Debugf("bigCacheIndexServant.getPosts get posts by key: %s from cache err: %v", key, err) - return nil, err - } - buf := bytes.NewBuffer(data) - dec := gob.NewDecoder(buf) - var resp core.IndexTweetList - if err := dec.Decode(&resp); err != nil { - logrus.Debugf("bigCacheIndexServant.getPosts get posts from cache in decode err: %v", err) - return nil, err - } - return &resp, nil -} - -func (s *bigCacheIndexServant) cachePosts(key string, tweets *core.IndexTweetList) { - entry := &postsEntry{key: key, tweets: tweets} - select { - case s.cachePostsCh <- entry: - logrus.Debugf("bigCacheIndexServant.cachePosts cachePosts by chan of key: %s", key) - default: - go func(ch chan<- *postsEntry, entry *postsEntry) { - logrus.Debugf("bigCacheIndexServant.cachePosts cachePosts indexAction by goroutine of key: %s", key) - ch <- entry - }(s.cachePostsCh, entry) - } -} - -func (s *bigCacheIndexServant) setPosts(entry *postsEntry) { - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - if err := enc.Encode(entry.tweets); err != nil { - logrus.Debugf("bigCacheIndexServant.setPosts setPosts encode post entry err: %v", err) - return - } - if err := s.cache.Set(entry.key, buf.Bytes()); err != nil { - logrus.Debugf("bigCacheIndexServant.setPosts setPosts set cache err: %v", err) - } - logrus.Debugf("bigCacheIndexServant.setPosts setPosts set cache by key: %s", entry.key) -} - -func (s *bigCacheIndexServant) keyFrom(user *core.User, offset int, limit int) string { - var userId int64 = -1 - if user != nil { - userId = user.ID - } - return fmt.Sprintf("index:%d:%d:%d", userId, offset, limit) +func (s *bigCacheTweetsCache) setTweetsBytes(key string, bs []byte) error { + return s.bc.Set(key, bs) } -func (s *bigCacheIndexServant) SendAction(act core.IdxAct, post *core.Post) { - action := core.NewIndexAction(act, post) - select { - case s.indexActionCh <- action: - logrus.Debugf("bigCacheIndexServant.SendAction send indexAction by chan: %s", act) - default: - go func(ch chan<- *core.IndexAction, act *core.IndexAction) { - logrus.Debugf("bigCacheIndexServant.SendAction send indexAction by goroutine: %s", action.Act) - ch <- act - }(s.indexActionCh, action) - } -} - -func (s *bigCacheIndexServant) startIndexPosts() { - for { - select { - case entry := <-s.cachePostsCh: - s.setPosts(entry) - case action := <-s.indexActionCh: - s.handleIndexAction(action) - } - } -} - -func (s *bigCacheIndexServant) handleIndexAction(action *core.IndexAction) { - act, post := action.Act, action.Post - - // 创建/删除 私密推文特殊处理 - switch act { - case core.IdxActCreatePost, core.IdxActDeletePost: - if post.Visibility == core.PostVisitPrivate { - s.deleteCacheByUserId(post.UserID, true) - return - } - } - - // 如果在s.preventDuration时间内就清除所有缓存,否则只清除自个儿的缓存 - // TODO: 需要优化只清除受影响的缓存,后续完善 - if time.Since(s.lastCacheResetTime) > s.preventDuration { - s.deleteCacheByUserId(post.UserID, false) - } else { - s.deleteCacheByUserId(post.UserID, true) +func (s *bigCacheTweetsCache) delTweets(keys []string) error { + for _, k := range keys { + s.bc.Delete(k) } + return nil } -func (s *bigCacheIndexServant) deleteCacheByUserId(id int64, oneself bool) { +func (s *bigCacheTweetsCache) allKeys() ([]string, error) { var keys []string - userId := strconv.FormatInt(id, 10) - friendSet := core.FriendSet{} - if !oneself { - friendSet = s.ams.MyFriendSet(id) - } - friendSet[userId] = types.Empty{} - - // 获取需要删除缓存的key,目前是仅删除自个儿的缓存 - for it := s.cache.Iterator(); it.SetNext(); { + for it := s.bc.Iterator(); it.SetNext(); { entry, err := it.Value() if err != nil { - logrus.Debugf("bigCacheIndexServant.deleteCacheByUserId userId: %s err:%s", userId, err) - return + return nil, err } - key := entry.Key() - keyParts := strings.Split(key, ":") - if len(keyParts) > 2 && keyParts[0] == "index" { - if _, ok := friendSet[keyParts[1]]; ok { - keys = append(keys, key) - } - } - } - - // 执行删缓存 - for _, k := range keys { - s.cache.Delete(k) + keys = append(keys, entry.Key()) } - s.lastCacheResetTime = time.Now() - logrus.Debugf("bigCacheIndexServant.deleteCacheByUserId userId:%s oneself:%t keys:%d", userId, oneself, len(keys)) + return keys, nil } -func (s *bigCacheIndexServant) Name() string { +func (s *bigCacheTweetsCache) Name() string { return "BigCacheIndex" } -func (s *bigCacheIndexServant) Version() *semver.Version { +func (s *bigCacheTweetsCache) Version() *semver.Version { return semver.MustParse("v0.2.0") } diff --git a/internal/dao/cache/cache.go b/internal/dao/cache/cache.go index 74868551..6c97d2ec 100644 --- a/internal/dao/cache/cache.go +++ b/internal/dao/cache/cache.go @@ -27,32 +27,23 @@ func NewBigCacheIndexService(ips core.IndexPostsService, ams core.AuthorizationM c.Verbose = s.Verbose c.MaxEntrySize = 10000 c.Logger = logrus.StandardLogger() - cache, err := bigcache.NewBigCache(c) + + bc, err := bigcache.NewBigCache(c) if err != nil { logrus.Fatalf("initial bigCahceIndex failure by err: %v", err) } + cacheIndex := newCacheIndexSrv(ips, ams, &bigCacheTweetsCache{ + bc: bc, + }) + return cacheIndex, cacheIndex +} - cacheIndex := &bigCacheIndexServant{ - ips: ips, - ams: ams, - cache: cache, - preventDuration: 10 * time.Second, - } - - // indexActionCh capacity custom configure by conf.yaml need in [10, 10000] - // or re-compile source to adjust min/max capacity - capacity := conf.CacheIndexSetting.MaxUpdateQPS - if capacity < 10 { - capacity = 10 - } else if capacity > 10000 { - capacity = 10000 - } - cacheIndex.indexActionCh = make(chan *core.IndexAction, capacity) - cacheIndex.cachePostsCh = make(chan *postsEntry, capacity) - - // 启动索引更新器 - go cacheIndex.startIndexPosts() - +func NewRedisCacheIndexService(ips core.IndexPostsService, ams core.AuthorizationManageService) (core.CacheIndexService, core.VersionInfo) { + cacheIndex := newCacheIndexSrv(ips, ams, &redisCacheTweetsCache{ + expireDuration: conf.RedisCacheIndexSetting.ExpireInSecond, + expireInSecond: int64(conf.RedisCacheIndexSetting.ExpireInSecond / time.Second), + c: conf.MustRedisClient(), + }) return cacheIndex, cacheIndex } diff --git a/internal/dao/cache/redis.go b/internal/dao/cache/redis.go index 2d626d46..bde3906d 100644 --- a/internal/dao/cache/redis.go +++ b/internal/dao/cache/redis.go @@ -10,27 +10,68 @@ import ( "time" "unsafe" + "github.com/Masterminds/semver/v3" "github.com/rocboss/paopao-ce/internal/core" "github.com/rueian/rueidis" ) var ( _ core.RedisCache = (*redisCache)(nil) + _ tweetsCache = (*redisCacheTweetsCache)(nil) ) const ( - _pushToSearchJobKey = "JOB_PUSH_TO_SEARCH" - _countLoginErrKey = "PaoPaoUserLoginErr:" - _imgCaptchaKey = "PaoPaoCaptcha:" - _smsCaptchaKey = "PaoPaoSmsCaptcha:" - _countWhisperKey = "WhisperTimes:" - _rechargeStatusKey = "PaoPaoRecharge:" + _cacheIndexKeyPattern = _cacheIndexKey + "*" + _pushToSearchJobKey = "paopao_push_to_search_job" + _countLoginErrKey = "paopao_count_login_err" + _imgCaptchaKey = "paopao_img_captcha:" + _smsCaptchaKey = "paopao_sms_captcha" + _countWhisperKey = "paopao_whisper_key" + _rechargeStatusKey = "paopao_recharge_status:" ) type redisCache struct { c rueidis.Client } +type redisCacheTweetsCache struct { + expireDuration time.Duration + expireInSecond int64 + c rueidis.Client +} + +func (s *redisCacheTweetsCache) getTweetsBytes(key string) ([]byte, error) { + res, err := rueidis.MGetCache(s.c, context.Background(), s.expireDuration, []string{key}) + if err != nil { + return nil, err + } + message := res[key] + return message.AsBytes() +} + +func (s *redisCacheTweetsCache) setTweetsBytes(key string, bs []byte) error { + cmd := s.c.B().Set().Key(key).Value(rueidis.BinaryString(bs)).ExSeconds(s.expireInSecond).Build() + return s.c.Do(context.Background(), cmd).Error() +} + +func (s *redisCacheTweetsCache) delTweets(keys []string) error { + cmd := s.c.B().Del().Key(keys...).Build() + return s.c.Do(context.Background(), cmd).Error() +} + +func (s *redisCacheTweetsCache) allKeys() ([]string, error) { + cmd := s.c.B().Keys().Pattern(_cacheIndexKeyPattern).Build() + return s.c.Do(context.Background(), cmd).AsStrSlice() +} + +func (s *redisCacheTweetsCache) Name() string { + return "RedisCacheIndex" +} + +func (s *redisCacheTweetsCache) Version() *semver.Version { + return semver.MustParse("v0.1.0") +} + func (r *redisCache) SetPushToSearchJob(ctx context.Context) error { return r.c.Do(ctx, r.c.B().Set(). Key(_pushToSearchJobKey).Value("1"). diff --git a/internal/dao/jinzhu/jinzhu.go b/internal/dao/jinzhu/jinzhu.go index 6d15f1b5..790ee6ad 100644 --- a/internal/dao/jinzhu/jinzhu.go +++ b/internal/dao/jinzhu/jinzhu.go @@ -63,16 +63,23 @@ func NewDataService() (core.DataService, core.VersionInfo) { } // initialize core.CacheIndexService - if cfg.If("SimpleCacheIndex") { - // simpleCache use special post index service - ips = newSimpleIndexPostsService(db, ths) - cis, v = cache.NewSimpleCacheIndexService(ips) - } else if cfg.If("BigCacheIndex") { - // TODO: make cache index post in different scence like friendship/followship/lightship - cis, v = cache.NewBigCacheIndexService(ips, ams) - } else { + cfg.On(cfg.Actions{ + "SimpleCacheIndex": func() { + // simpleCache use special post index service + ips = newSimpleIndexPostsService(db, ths) + cis, v = cache.NewSimpleCacheIndexService(ips) + }, + "BigCacheIndex": func() { + // TODO: make cache index post in different scence like friendship/followship/lightship + cis, v = cache.NewBigCacheIndexService(ips, ams) + }, + "RedisCacheIndex": func() { + cis, v = cache.NewRedisCacheIndexService(ips, ams) + }, + }, func() { + // defualt no cache cis, v = cache.NewNoneCacheIndexService(ips) - } + }) logrus.Infof("use %s as cache index service by version: %s", v.Name(), v.Version()) ds := &dataServant{