Merge branch 'x/gorm' into x/sqlx

r/paopao-ce-xtra
Michael Li 2 years ago
commit 1226a1a60d
No known key found for this signature in database

@ -4,6 +4,32 @@ All notable changes to paopao-ce are documented in this file.
## 0.3.0+dev ([`dev`](https://github.com/rocboss/paopao-ce/tree/dev)) ## 0.3.0+dev ([`dev`](https://github.com/rocboss/paopao-ce/tree/dev))
### Added
- add `RedisCacheIndex` feature [#250](https://github.com/rocboss/paopao-ce/pull/250)
### Changed
- use [github.com/rueian/rueidis](https://github.com/rueian/rueidis) as Redis client [#249](https://github.com/rocboss/paopao-ce/pull/249)
the **Old** redis client configure field
```yaml
...
Redis:
Host: redis:6379
Password:
DB:
```
the **New** redis client configure field
```yaml
...
Redis:
InitAddress:
- redis:6379
Username:
Password:
SelectDB:
ConnWriteTimeout: 60 # 连接写超时时间 多少秒 默认 60秒
```
## 0.2.3 ## 0.2.3
### Added ### Added

@ -359,6 +359,7 @@ release/paopao-ce --no-default-features --features sqlite3,localoss,loggerfile,r
|`Redis` | 缓存 | 稳定 | Redis缓存功能 | |`Redis` | 缓存 | 稳定 | Redis缓存功能 |
|`SimpleCacheIndex` | 缓存 | Deprecated | 提供简单的 广场推文列表 的缓存功能 | |`SimpleCacheIndex` | 缓存 | Deprecated | 提供简单的 广场推文列表 的缓存功能 |
|`BigCacheIndex` | 缓存 | 稳定(推荐) | 使用[BigCache](https://github.com/allegro/bigcache)缓存 广场推文列表,缓存每个用户每一页,简单做到千人千面 | |`BigCacheIndex` | 缓存 | 稳定(推荐) | 使用[BigCache](https://github.com/allegro/bigcache)缓存 广场推文列表,缓存每个用户每一页,简单做到千人千面 |
|`RedisCacheIndex` | 缓存 | 内测(推荐) | 使用Redis缓存 广场推文列表,缓存每个用户每一页,简单做到千人千面 |
|`Zinc` | 搜索 | 稳定(推荐) | 基于[Zinc](https://github.com/zinclabs/zinc)搜索引擎提供推文搜索服务 | |`Zinc` | 搜索 | 稳定(推荐) | 基于[Zinc](https://github.com/zinclabs/zinc)搜索引擎提供推文搜索服务 |
|`Meili` | 搜索 | 稳定(推荐) | 基于[Meilisearch](https://github.com/meilisearch/meilisearch)搜索引擎提供推文搜索服务 | |`Meili` | 搜索 | 稳定(推荐) | 基于[Meilisearch](https://github.com/meilisearch/meilisearch)搜索引擎提供推文搜索服务 |
|`Bleve` | 搜索 | WIP | 基于[Bleve](https://github.com/blevesearch/bleve)搜索引擎提供推文搜索服务 | |`Bleve` | 搜索 | WIP | 基于[Bleve](https://github.com/blevesearch/bleve)搜索引擎提供推文搜索服务 |

@ -6,15 +6,16 @@
* [ ] add `Followship` feature * [ ] add `Followship` feature
* [ ] add `Auth:Bcrypt` feature * [ ] add `Auth:Bcrypt` feature
* [ ] add `Auth:MD5` feature (just for compatible) * [ ] add `Auth:MD5` feature (just for compatible)
* [x] add `RedisCacheIndex` feature
* [ ] add extend base ORM code for implement data logic base sqlx/sqlc * [ ] add extend base ORM code for implement data logic base sqlx/sqlc
* [ ] optimize media tweet submit logic * [ ] optimize media tweet submit logic
* [ ] optimize search logic service * [ ] optimize search logic service
* [x] remove `Deprecated:OldWeb` feature * [x] remove `Deprecated:OldWeb` feature
#### v0.2.0 #### v0.2.0
* [x] add `Friendship` feature * [x] add `Friendship` feature
* [x] add `Lightship` feature * [x] add `Lightship` feature
* [ ] add extend base ORM code for implement data logic base sqlx/sqlc
* [x] add `Pyroscope` feature * [x] add `Pyroscope` feature
* [x] add new `Web` service * [x] add new `Web` service
* [x] add `Frontend:Web` feature * [x] add `Frontend:Web` feature

@ -105,6 +105,10 @@
* [ ] 提按文档 * [ ] 提按文档
* [x] 接口定义 * [x] 接口定义
* [x] 业务逻辑实现 * [x] 业务逻辑实现
* `RedisCacheIndex` 使用Redis缓存 广场推文列表,缓存每个用户每一页,简单做到千人千面(目前状态: 内测阶段,推荐使用)
* [ ] 提按文档
* [x] 接口定义
* [x] 业务逻辑实现
#### 搜索: #### 搜索:
* `Zinc` 基于[Zinc](https://github.com/zinclabs/zinc)搜索引擎提供推文搜索服务(目前状态: 稳定,推荐使用) * `Zinc` 基于[Zinc](https://github.com/zinclabs/zinc)搜索引擎提供推文搜索服务(目前状态: 稳定,推荐使用)

@ -36,6 +36,7 @@ var (
CacheIndexSetting *CacheIndexSettingS CacheIndexSetting *CacheIndexSettingS
SimpleCacheIndexSetting *SimpleCacheIndexSettingS SimpleCacheIndexSetting *SimpleCacheIndexSettingS
BigCacheIndexSetting *BigCacheIndexSettingS BigCacheIndexSetting *BigCacheIndexSettingS
RedisCacheIndexSetting *RedisCacheIndexSettingS
SmsJuheSetting *SmsJuheSettings SmsJuheSetting *SmsJuheSettings
AlipaySetting *AlipaySettingS AlipaySetting *AlipaySettingS
TweetSearchSetting *TweetSearchS TweetSearchSetting *TweetSearchS
@ -78,6 +79,7 @@ func setupSetting(suite []string, noDefault bool) error {
"CacheIndex": &CacheIndexSetting, "CacheIndex": &CacheIndexSetting,
"SimpleCacheIndex": &SimpleCacheIndexSetting, "SimpleCacheIndex": &SimpleCacheIndexSetting,
"BigCacheIndex": &BigCacheIndexSetting, "BigCacheIndex": &BigCacheIndexSetting,
"RedisCacheIndex": &RedisCacheIndexSetting,
"Alipay": &AlipaySetting, "Alipay": &AlipaySetting,
"SmsJuhe": &SmsJuheSetting, "SmsJuhe": &SmsJuheSetting,
"Pyroscope": &PyroscopeSetting, "Pyroscope": &PyroscopeSetting,
@ -110,6 +112,7 @@ func setupSetting(suite []string, noDefault bool) error {
SimpleCacheIndexSetting.CheckTickDuration *= time.Second SimpleCacheIndexSetting.CheckTickDuration *= time.Second
SimpleCacheIndexSetting.ExpireTickDuration *= time.Second SimpleCacheIndexSetting.ExpireTickDuration *= time.Second
BigCacheIndexSetting.ExpireInSecond *= time.Second BigCacheIndexSetting.ExpireInSecond *= time.Second
RedisCacheIndexSetting.ExpireInSecond *= time.Second
redisSetting.ConnWriteTimeout *= time.Second redisSetting.ConnWriteTimeout *= time.Second
Mutex = &sync.Mutex{} Mutex = &sync.Mutex{}

@ -73,6 +73,9 @@ BigCacheIndex: # 使用BigCache缓存泡泡广场消息流
HardMaxCacheSize: 256 # 最大缓存大小(MB)0表示无限制 HardMaxCacheSize: 256 # 最大缓存大小(MB)0表示无限制
Verbose: False # 是否打印cache操作的log Verbose: False # 是否打印cache操作的log
ExpireInSecond: 300 # 多少秒(>0)后强制过期缓存 ExpireInSecond: 300 # 多少秒(>0)后强制过期缓存
RedisCacheIndex: # 使用Redis缓存泡泡广场消息流
Verbose: False # 是否打印cache操作的log
ExpireInSecond: 300 # 多少秒(>0)后强制过期缓存
Pyroscope: # Pyroscope配置 Pyroscope: # Pyroscope配置
AppName: "paopao-ce" # application name AppName: "paopao-ce" # application name
Endpoint: "http://localhost:4040" # Pyroscope server address Endpoint: "http://localhost:4040" # Pyroscope server address

@ -97,6 +97,11 @@ type BigCacheIndexSettingS struct {
Verbose bool Verbose bool
} }
type RedisCacheIndexSettingS struct {
ExpireInSecond time.Duration
Verbose bool
}
type AlipaySettingS struct { type AlipaySettingS struct {
AppID string AppID string
PrivateKey string PrivateKey string

@ -0,0 +1,229 @@
// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package cache
import (
"bytes"
"encoding/gob"
"fmt"
"strconv"
"strings"
"time"
"github.com/Masterminds/semver/v3"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/pkg/types"
"github.com/sirupsen/logrus"
)
const (
_cacheIndexKey = "paopao_index"
)
var (
_ core.CacheIndexService = (*cacheIndexSrv)(nil)
_ core.VersionInfo = (*cacheIndexSrv)(nil)
)
type postsEntry struct {
key string
tweets *core.IndexTweetList
}
type tweetsCache interface {
core.VersionInfo
getTweetsBytes(key string) ([]byte, error)
setTweetsBytes(key string, bs []byte) error
delTweets(keys []string) error
allKeys() ([]string, error)
}
type cacheIndexSrv struct {
ips core.IndexPostsService
ams core.AuthorizationManageService
name string
version *semver.Version
indexActionCh chan *core.IndexAction
cachePostsCh chan *postsEntry
cache tweetsCache
lastCacheResetTime time.Time
preventDuration time.Duration
}
func (s *cacheIndexSrv) IndexPosts(user *core.User, offset int, limit int) (*core.IndexTweetList, error) {
key := s.keyFrom(user, offset, limit)
posts, err := s.getPosts(key)
if err == nil {
logrus.Debugf("cacheIndexSrv.IndexPosts get index posts from cache by key: %s", key)
return posts, nil
}
if posts, err = s.ips.IndexPosts(user, offset, limit); err != nil {
return nil, err
}
logrus.Debugf("cacheIndexSrv.IndexPosts get index posts from database by key: %s", key)
s.cachePosts(key, posts)
return posts, nil
}
func (s *cacheIndexSrv) getPosts(key string) (*core.IndexTweetList, error) {
data, err := s.cache.getTweetsBytes(key)
if err != nil {
logrus.Debugf("cacheIndexSrv.getPosts get posts by key: %s from cache err: %v", key, err)
return nil, err
}
buf := bytes.NewBuffer(data)
dec := gob.NewDecoder(buf)
var resp core.IndexTweetList
if err := dec.Decode(&resp); err != nil {
logrus.Debugf("cacheIndexSrv.getPosts get posts from cache in decode err: %v", err)
return nil, err
}
return &resp, nil
}
func (s *cacheIndexSrv) cachePosts(key string, tweets *core.IndexTweetList) {
entry := &postsEntry{key: key, tweets: tweets}
select {
case s.cachePostsCh <- entry:
logrus.Debugf("cacheIndexSrv.cachePosts cachePosts by chan of key: %s", key)
default:
go func(ch chan<- *postsEntry, entry *postsEntry) {
logrus.Debugf("cacheIndexSrv.cachePosts cachePosts indexAction by goroutine of key: %s", key)
ch <- entry
}(s.cachePostsCh, entry)
}
}
func (s *cacheIndexSrv) setPosts(entry *postsEntry) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(entry.tweets); err != nil {
logrus.Debugf("cacheIndexSrv.setPosts setPosts encode post entry err: %v", err)
return
}
if err := s.cache.setTweetsBytes(entry.key, buf.Bytes()); err != nil {
logrus.Debugf("cacheIndexSrv.setPosts setPosts set cache err: %v", err)
}
logrus.Debugf("cacheIndexSrv.setPosts setPosts set cache by key: %s", entry.key)
}
func (s *cacheIndexSrv) keyFrom(user *core.User, offset int, limit int) string {
var userId int64 = -1
if user != nil {
userId = user.ID
}
return fmt.Sprintf("%s:%d:%d:%d", _cacheIndexKey, userId, offset, limit)
}
func (s *cacheIndexSrv) SendAction(act core.IdxAct, post *core.Post) {
action := core.NewIndexAction(act, post)
select {
case s.indexActionCh <- action:
logrus.Debugf("cacheIndexSrv.SendAction send indexAction by chan: %s", act)
default:
go func(ch chan<- *core.IndexAction, act *core.IndexAction) {
logrus.Debugf("cacheIndexSrv.SendAction send indexAction by goroutine: %s", action.Act)
ch <- act
}(s.indexActionCh, action)
}
}
func (s *cacheIndexSrv) startIndexPosts() {
for {
select {
case entry := <-s.cachePostsCh:
s.setPosts(entry)
case action := <-s.indexActionCh:
s.handleIndexAction(action)
}
}
}
func (s *cacheIndexSrv) handleIndexAction(action *core.IndexAction) {
act, post := action.Act, action.Post
// 创建/删除 私密推文特殊处理
switch act {
case core.IdxActCreatePost, core.IdxActDeletePost:
if post.Visibility == core.PostVisitPrivate {
s.deleteCacheByUserId(post.UserID, true)
return
}
}
// 如果在s.preventDuration时间内就清除所有缓存否则只清除自个儿的缓存
// TODO: 需要优化只清除受影响的缓存,后续完善
if time.Since(s.lastCacheResetTime) > s.preventDuration {
s.deleteCacheByUserId(post.UserID, false)
} else {
s.deleteCacheByUserId(post.UserID, true)
}
}
func (s *cacheIndexSrv) deleteCacheByUserId(id int64, oneself bool) {
var keys []string
userId := strconv.FormatInt(id, 10)
friendSet := core.FriendSet{}
if !oneself {
friendSet = s.ams.MyFriendSet(id)
}
friendSet[userId] = types.Empty{}
// 获取需要删除缓存的key目前是仅删除自个儿的缓存
allKeys, err := s.cache.allKeys()
if err != nil {
logrus.Debugf("cacheIndexSrv.deleteCacheByUserId userId: %s err:%s", userId, err)
}
for _, key := range allKeys {
keyParts := strings.Split(key, ":")
if len(keyParts) > 2 && keyParts[0] == "index" {
if _, ok := friendSet[keyParts[1]]; ok {
keys = append(keys, key)
}
}
}
// 执行删缓存
s.cache.delTweets(keys)
s.lastCacheResetTime = time.Now()
logrus.Debugf("cacheIndexSrv.deleteCacheByUserId userId:%s oneself:%t keys:%d", userId, oneself, len(keys))
}
func (s *cacheIndexSrv) Name() string {
return s.name
}
func (s *cacheIndexSrv) Version() *semver.Version {
return s.version
}
func newCacheIndexSrv(ips core.IndexPostsService, ams core.AuthorizationManageService, tc tweetsCache) *cacheIndexSrv {
cacheIndex := &cacheIndexSrv{
ips: ips,
ams: ams,
cache: tc,
name: tc.Name(),
version: tc.Version(),
preventDuration: 10 * time.Second,
}
// indexActionCh capacity custom configure by conf.yaml need in [10, 10000]
// or re-compile source to adjust min/max capacity
capacity := conf.CacheIndexSetting.MaxUpdateQPS
if capacity < 10 {
capacity = 10
} else if capacity > 10000 {
capacity = 10000
}
cacheIndex.indexActionCh = make(chan *core.IndexAction, capacity)
cacheIndex.cachePostsCh = make(chan *postsEntry, capacity)
// 启动索引更新器
go cacheIndex.startIndexPosts()
return cacheIndex
}

@ -5,196 +5,51 @@
package cache package cache
import ( import (
"bytes"
"encoding/gob"
"fmt"
"strconv"
"strings"
"time"
"github.com/Masterminds/semver/v3" "github.com/Masterminds/semver/v3"
"github.com/allegro/bigcache/v3" "github.com/allegro/bigcache/v3"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/core/cs"
"github.com/rocboss/paopao-ce/pkg/debug"
"github.com/rocboss/paopao-ce/pkg/types"
"github.com/sirupsen/logrus"
) )
var ( var (
_ core.CacheIndexService = (*bigCacheIndexServant)(nil) _ tweetsCache = (*bigCacheTweetsCache)(nil)
_ core.VersionInfo = (*bigCacheIndexServant)(nil)
) )
type postsEntry struct { type bigCacheTweetsCache struct {
key string name string
tweets *core.IndexTweetList version *semver.Version
} bc *bigcache.BigCache
type bigCacheIndexServant struct {
ips core.IndexPostsService
ams core.AuthorizationManageService
indexActionCh chan *core.IndexAction
cachePostsCh chan *postsEntry
cache *bigcache.BigCache
lastCacheResetTime time.Time
preventDuration time.Duration
}
func (s *bigCacheIndexServant) IndexPosts(user *core.User, offset int, limit int) (*core.IndexTweetList, error) {
key := s.keyFrom(user, offset, limit)
posts, err := s.getPosts(key)
if err == nil {
logrus.Debugf("bigCacheIndexServant.IndexPosts get index posts from cache by key: %s", key)
return posts, nil
}
if posts, err = s.ips.IndexPosts(user, offset, limit); err != nil {
return nil, err
}
logrus.Debugf("bigCacheIndexServant.IndexPosts get index posts from database by key: %s", key)
s.cachePosts(key, posts)
return posts, nil
}
func (s *bigCacheIndexServant) TweetTimeline(userId int64, offset int, limit int) (*cs.TweetBox, error) {
// TODO
return nil, debug.ErrNotImplemented
}
func (s *bigCacheIndexServant) getPosts(key string) (*core.IndexTweetList, error) {
data, err := s.cache.Get(key)
if err != nil {
logrus.Debugf("bigCacheIndexServant.getPosts get posts by key: %s from cache err: %v", key, err)
return nil, err
}
buf := bytes.NewBuffer(data)
dec := gob.NewDecoder(buf)
var resp core.IndexTweetList
if err := dec.Decode(&resp); err != nil {
logrus.Debugf("bigCacheIndexServant.getPosts get posts from cache in decode err: %v", err)
return nil, err
}
return &resp, nil
} }
func (s *bigCacheIndexServant) cachePosts(key string, tweets *core.IndexTweetList) { func (s *bigCacheTweetsCache) getTweetsBytes(key string) ([]byte, error) {
entry := &postsEntry{key: key, tweets: tweets} return s.bc.Get(key)
select {
case s.cachePostsCh <- entry:
logrus.Debugf("bigCacheIndexServant.cachePosts cachePosts by chan of key: %s", key)
default:
go func(ch chan<- *postsEntry, entry *postsEntry) {
logrus.Debugf("bigCacheIndexServant.cachePosts cachePosts indexAction by goroutine of key: %s", key)
ch <- entry
}(s.cachePostsCh, entry)
}
} }
func (s *bigCacheIndexServant) setPosts(entry *postsEntry) { func (s *bigCacheTweetsCache) setTweetsBytes(key string, bs []byte) error {
var buf bytes.Buffer return s.bc.Set(key, bs)
enc := gob.NewEncoder(&buf)
if err := enc.Encode(entry.tweets); err != nil {
logrus.Debugf("bigCacheIndexServant.setPosts setPosts encode post entry err: %v", err)
return
}
if err := s.cache.Set(entry.key, buf.Bytes()); err != nil {
logrus.Debugf("bigCacheIndexServant.setPosts setPosts set cache err: %v", err)
}
logrus.Debugf("bigCacheIndexServant.setPosts setPosts set cache by key: %s", entry.key)
} }
func (s *bigCacheIndexServant) keyFrom(user *core.User, offset int, limit int) string { func (s *bigCacheTweetsCache) delTweets(keys []string) error {
var userId int64 = -1 for _, k := range keys {
if user != nil { s.bc.Delete(k)
userId = user.ID
}
return fmt.Sprintf("index:%d:%d:%d", userId, offset, limit)
}
func (s *bigCacheIndexServant) SendAction(act core.IdxAct, post *core.Post) {
action := core.NewIndexAction(act, post)
select {
case s.indexActionCh <- action:
logrus.Debugf("bigCacheIndexServant.SendAction send indexAction by chan: %s", act)
default:
go func(ch chan<- *core.IndexAction, act *core.IndexAction) {
logrus.Debugf("bigCacheIndexServant.SendAction send indexAction by goroutine: %s", action.Act)
ch <- act
}(s.indexActionCh, action)
}
}
func (s *bigCacheIndexServant) startIndexPosts() {
for {
select {
case entry := <-s.cachePostsCh:
s.setPosts(entry)
case action := <-s.indexActionCh:
s.handleIndexAction(action)
}
}
}
func (s *bigCacheIndexServant) handleIndexAction(action *core.IndexAction) {
act, post := action.Act, action.Post
// 创建/删除 私密推文特殊处理
switch act {
case core.IdxActCreatePost, core.IdxActDeletePost:
if post.Visibility == core.PostVisitPrivate {
s.deleteCacheByUserId(post.UserID, true)
return
}
}
// 如果在s.preventDuration时间内就清除所有缓存否则只清除自个儿的缓存
// TODO: 需要优化只清除受影响的缓存,后续完善
if time.Since(s.lastCacheResetTime) > s.preventDuration {
s.deleteCacheByUserId(post.UserID, false)
} else {
s.deleteCacheByUserId(post.UserID, true)
} }
return nil
} }
func (s *bigCacheIndexServant) deleteCacheByUserId(id int64, oneself bool) { func (s *bigCacheTweetsCache) allKeys() ([]string, error) {
var keys []string var keys []string
userId := strconv.FormatInt(id, 10) for it := s.bc.Iterator(); it.SetNext(); {
friendSet := core.FriendSet{}
if !oneself {
friendSet = s.ams.MyFriendSet(id)
}
friendSet[userId] = types.Empty{}
// 获取需要删除缓存的key目前是仅删除自个儿的缓存
for it := s.cache.Iterator(); it.SetNext(); {
entry, err := it.Value() entry, err := it.Value()
if err != nil { if err != nil {
logrus.Debugf("bigCacheIndexServant.deleteCacheByUserId userId: %s err:%s", userId, err) return nil, err
return
} }
key := entry.Key() keys = append(keys, entry.Key())
keyParts := strings.Split(key, ":")
if len(keyParts) > 2 && keyParts[0] == "index" {
if _, ok := friendSet[keyParts[1]]; ok {
keys = append(keys, key)
}
}
}
// 执行删缓存
for _, k := range keys {
s.cache.Delete(k)
} }
s.lastCacheResetTime = time.Now() return keys, nil
logrus.Debugf("bigCacheIndexServant.deleteCacheByUserId userId:%s oneself:%t keys:%d", userId, oneself, len(keys))
} }
func (s *bigCacheIndexServant) Name() string { func (s *bigCacheTweetsCache) Name() string {
return "BigCacheIndex" return "BigCacheIndex"
} }
func (s *bigCacheIndexServant) Version() *semver.Version { func (s *bigCacheTweetsCache) Version() *semver.Version {
return semver.MustParse("v0.2.0") return semver.MustParse("v0.2.0")
} }

@ -27,32 +27,23 @@ func NewBigCacheIndexService(ips core.IndexPostsService, ams core.AuthorizationM
c.Verbose = s.Verbose c.Verbose = s.Verbose
c.MaxEntrySize = 10000 c.MaxEntrySize = 10000
c.Logger = logrus.StandardLogger() c.Logger = logrus.StandardLogger()
cache, err := bigcache.NewBigCache(c)
bc, err := bigcache.NewBigCache(c)
if err != nil { if err != nil {
logrus.Fatalf("initial bigCahceIndex failure by err: %v", err) logrus.Fatalf("initial bigCahceIndex failure by err: %v", err)
} }
cacheIndex := newCacheIndexSrv(ips, ams, &bigCacheTweetsCache{
bc: bc,
})
return cacheIndex, cacheIndex
}
cacheIndex := &bigCacheIndexServant{ func NewRedisCacheIndexService(ips core.IndexPostsService, ams core.AuthorizationManageService) (core.CacheIndexService, core.VersionInfo) {
ips: ips, cacheIndex := newCacheIndexSrv(ips, ams, &redisCacheTweetsCache{
ams: ams, expireDuration: conf.RedisCacheIndexSetting.ExpireInSecond,
cache: cache, expireInSecond: int64(conf.RedisCacheIndexSetting.ExpireInSecond / time.Second),
preventDuration: 10 * time.Second, c: conf.MustRedisClient(),
} })
// indexActionCh capacity custom configure by conf.yaml need in [10, 10000]
// or re-compile source to adjust min/max capacity
capacity := conf.CacheIndexSetting.MaxUpdateQPS
if capacity < 10 {
capacity = 10
} else if capacity > 10000 {
capacity = 10000
}
cacheIndex.indexActionCh = make(chan *core.IndexAction, capacity)
cacheIndex.cachePostsCh = make(chan *postsEntry, capacity)
// 启动索引更新器
go cacheIndex.startIndexPosts()
return cacheIndex, cacheIndex return cacheIndex, cacheIndex
} }

@ -1,3 +1,7 @@
// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package cache package cache
import ( import (
@ -6,27 +10,68 @@ import (
"time" "time"
"unsafe" "unsafe"
"github.com/Masterminds/semver/v3"
"github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/core"
"github.com/rueian/rueidis" "github.com/rueian/rueidis"
) )
var ( var (
_ core.RedisCache = (*redisCache)(nil) _ core.RedisCache = (*redisCache)(nil)
_ tweetsCache = (*redisCacheTweetsCache)(nil)
) )
const ( const (
_pushToSearchJobKey = "JOB_PUSH_TO_SEARCH" _cacheIndexKeyPattern = _cacheIndexKey + "*"
_countLoginErrKey = "PaoPaoUserLoginErr:" _pushToSearchJobKey = "paopao_push_to_search_job"
_imgCaptchaKey = "PaoPaoCaptcha:" _countLoginErrKey = "paopao_count_login_err"
_smsCaptchaKey = "PaoPaoSmsCaptcha:" _imgCaptchaKey = "paopao_img_captcha:"
_countWhisperKey = "WhisperTimes:" _smsCaptchaKey = "paopao_sms_captcha"
_rechargeStatusKey = "PaoPaoRecharge:" _countWhisperKey = "paopao_whisper_key"
_rechargeStatusKey = "paopao_recharge_status:"
) )
type redisCache struct { type redisCache struct {
c rueidis.Client c rueidis.Client
} }
type redisCacheTweetsCache struct {
expireDuration time.Duration
expireInSecond int64
c rueidis.Client
}
func (s *redisCacheTweetsCache) getTweetsBytes(key string) ([]byte, error) {
res, err := rueidis.MGetCache(s.c, context.Background(), s.expireDuration, []string{key})
if err != nil {
return nil, err
}
message := res[key]
return message.AsBytes()
}
func (s *redisCacheTweetsCache) setTweetsBytes(key string, bs []byte) error {
cmd := s.c.B().Set().Key(key).Value(rueidis.BinaryString(bs)).ExSeconds(s.expireInSecond).Build()
return s.c.Do(context.Background(), cmd).Error()
}
func (s *redisCacheTweetsCache) delTweets(keys []string) error {
cmd := s.c.B().Del().Key(keys...).Build()
return s.c.Do(context.Background(), cmd).Error()
}
func (s *redisCacheTweetsCache) allKeys() ([]string, error) {
cmd := s.c.B().Keys().Pattern(_cacheIndexKeyPattern).Build()
return s.c.Do(context.Background(), cmd).AsStrSlice()
}
func (s *redisCacheTweetsCache) Name() string {
return "RedisCacheIndex"
}
func (s *redisCacheTweetsCache) Version() *semver.Version {
return semver.MustParse("v0.1.0")
}
func (r *redisCache) SetPushToSearchJob(ctx context.Context) error { func (r *redisCache) SetPushToSearchJob(ctx context.Context) error {
return r.c.Do(ctx, r.c.B().Set(). return r.c.Do(ctx, r.c.B().Set().
Key(_pushToSearchJobKey).Value("1"). Key(_pushToSearchJobKey).Value("1").

@ -79,16 +79,23 @@ func NewDataService() (core.DataService, core.VersionInfo) {
} }
// initialize core.CacheIndexService // initialize core.CacheIndexService
if cfg.If("SimpleCacheIndex") { cfg.On(cfg.Actions{
// simpleCache use special post index service "SimpleCacheIndex": func() {
ips = newSimpleIndexPostsService(db, ths) // simpleCache use special post index service
cis, v = cache.NewSimpleCacheIndexService(ips) ips = newSimpleIndexPostsService(db, ths)
} else if cfg.If("BigCacheIndex") { cis, v = cache.NewSimpleCacheIndexService(ips)
// TODO: make cache index post in different scence like friendship/followship/lightship },
cis, v = cache.NewBigCacheIndexService(ips, ams) "BigCacheIndex": func() {
} else { // TODO: make cache index post in different scence like friendship/followship/lightship
cis, v = cache.NewBigCacheIndexService(ips, ams)
},
"RedisCacheIndex": func() {
cis, v = cache.NewRedisCacheIndexService(ips, ams)
},
}, func() {
// defualt no cache
cis, v = cache.NewNoneCacheIndexService(ips) cis, v = cache.NewNoneCacheIndexService(ips)
} })
logrus.Infof("use %s as cache index service by version: %s", v.Name(), v.Version()) logrus.Infof("use %s as cache index service by version: %s", v.Name(), v.Version())
ds := &dataSrv{ ds := &dataSrv{

Loading…
Cancel
Save