diff --git a/auto/api/v1/loose.go b/auto/api/v1/loose.go index 4c9f9c6f..8c6fcbc8 100644 --- a/auto/api/v1/loose.go +++ b/auto/api/v1/loose.go @@ -89,7 +89,12 @@ func RegisterLooseServant(e *gin.Engine, s Loose) { return } resp, err := s.GetUserTweets(req) - s.Render(c, resp, err) + if err != nil { + s.Render(c, nil, err) + return + } + var rv _render_ = resp + rv.Render(c) }) router.Handle("GET", "/posts", func(c *gin.Context) { select { diff --git a/internal/conf/cache.go b/internal/conf/cache.go index ae3875b2..8f69a01c 100644 --- a/internal/conf/cache.go +++ b/internal/conf/cache.go @@ -14,6 +14,11 @@ const ( _defaultKeyPoolSize = 128 ) +// 以下包含一些在cache中会用到的key的前缀 +const ( + PrefixUserTweets = "paopao:usertweets:" +) + // 以下包含一些在cache中会用到的池化后的key var ( KeyUnreadMsg cache.KeyPool[int64] diff --git a/internal/conf/config.yaml b/internal/conf/config.yaml index 5f6c8ceb..20fe34ad 100644 --- a/internal/conf/config.yaml +++ b/internal/conf/config.yaml @@ -9,6 +9,7 @@ Cache: KeyPoolSize: 256 # 键的池大小, 设置范围[128, ++], 默认256 CientSideCacheExpire: 60 # 客户端缓存过期时间 默认60s UnreadMsgExpire: 60 # 未读消息过期时间,单位秒, 默认60s + UserTweetsExpire: 60 # 获取用户推文列表过期时间,单位秒, 默认60s EventManager: # 事件管理器的配置参数 MinWorker: 10 # 最小后台工作者, 设置范围[5, ++], 默认10 MaxEventBuf: 100 # 最大log缓存条数, 设置范围[10, ++], 默认100 diff --git a/internal/conf/setting.go b/internal/conf/setting.go index 04fcd9c9..0f64f446 100644 --- a/internal/conf/setting.go +++ b/internal/conf/setting.go @@ -100,6 +100,7 @@ type cacheConf struct { KeyPoolSize int CientSideCacheExpire time.Duration UnreadMsgExpire int64 + UserTweetsExpire int64 } type eventManagerConf struct { diff --git a/internal/core/cache.go b/internal/core/cache.go index 2a0db675..335a62ae 100644 --- a/internal/core/cache.go +++ b/internal/core/cache.go @@ -98,8 +98,18 @@ type RedisCache interface { DelRechargeStatus(ctx context.Context, tradeNo string) error } +type AppCache interface { + Get(key string) ([]byte, error) + Set(key string, data []byte, ex int64) error + Delete(key ...string) error + DelAny(pattern string) error + Exist(key string) bool +} + type WebCache interface { + AppCache GetUnreadMsgCountResp(uid int64) ([]byte, error) PutUnreadMsgCountResp(uid int64, data []byte) error DelUnreadMsgCountResp(uid int64) error + ExistUnreadMsgCountResp(uid int64) bool } diff --git a/internal/dao/cache/cache.go b/internal/dao/cache/cache.go index 05f6fce8..2cee6684 100644 --- a/internal/dao/cache/cache.go +++ b/internal/dao/cache/cache.go @@ -58,6 +58,11 @@ func NewWebCache() core.WebCache { return _webCache } +func NewAppCache() core.AppCache { + lazyInitial() + return _appCache +} + func NewSimpleCacheIndexService(indexPosts core.IndexPostsService) (core.CacheIndexService, core.VersionInfo) { s := conf.SimpleCacheIndexSetting cacheIndex := &simpleCacheIndexServant{ @@ -101,6 +106,7 @@ func NewNoneCacheIndexService(indexPosts core.IndexPostsService) (core.CacheInde func lazyInitial() { _onceInit.Do(func() { - _webCache = newWebCache() + _appCache = newAppCache() + _webCache = newWebCache(_appCache) }) } diff --git a/internal/dao/cache/web.go b/internal/dao/cache/web.go index d911c5bc..6db706c0 100644 --- a/internal/dao/cache/web.go +++ b/internal/dao/cache/web.go @@ -8,7 +8,6 @@ import ( "context" "time" - "github.com/Masterminds/semver/v3" "github.com/redis/rueidis" "github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/core" @@ -16,25 +15,21 @@ import ( ) var ( - _webCache core.WebCache = (*redisWebCache)(nil) + _webCache core.WebCache = (*webCache)(nil) + _appCache core.AppCache = (*appCache)(nil) ) -type redisWebCache struct { - cscExpire time.Duration - unreadMsgExpire int64 - c rueidis.Client -} - -func (s *redisWebCache) Name() string { - return "RedisWebCache" +type appCache struct { + cscExpire time.Duration + c rueidis.Client } -func (s *redisWebCache) Version() *semver.Version { - return semver.MustParse("v0.1.0") +type webCache struct { + core.AppCache + unreadMsgExpire int64 } -func (s *redisWebCache) GetUnreadMsgCountResp(uid int64) ([]byte, error) { - key := conf.KeyUnreadMsg.Get(uid) +func (s *appCache) Get(key string) ([]byte, error) { res, err := rueidis.MGetCache(s.c, context.Background(), s.cscExpire, []string{key}) if err != nil { return nil, err @@ -43,24 +38,80 @@ func (s *redisWebCache) GetUnreadMsgCountResp(uid int64) ([]byte, error) { return message.AsBytes() } -func (s *redisWebCache) PutUnreadMsgCountResp(uid int64, data []byte) error { +func (s *appCache) Set(key string, data []byte, ex int64) error { return s.c.Do(context.Background(), s.c.B().Set(). - Key(conf.KeyUnreadMsg.Get(uid)). + Key(key). Value(utils.String(data)). - ExSeconds(s.unreadMsgExpire). + ExSeconds(ex). Build()). Error() } -func (s *redisWebCache) DelUnreadMsgCountResp(uid int64) error { - return s.c.Do(context.Background(), s.c.B().Del().Key(conf.KeyUnreadMsg.Get(uid)).Build()).Error() +func (s *appCache) Delete(keys ...string) (err error) { + if len(keys) != 0 { + err = s.c.Do(context.Background(), s.c.B().Del().Key(keys...).Build()).Error() + } + return +} + +func (s *appCache) DelAny(pattern string) (err error) { + var ( + keys []string + cursor uint64 + entry rueidis.ScanEntry + ) + ctx := context.Background() + for { + cmd := s.c.B().Scan().Cursor(cursor).Match(pattern).Count(50).Build() + if entry, err = s.c.Do(ctx, cmd).AsScanEntry(); err != nil { + return + } + keys = append(keys, entry.Elements...) + if entry.Cursor != 0 { + cursor = entry.Cursor + continue + } + break + } + if len(keys) != 0 { + err = s.c.Do(ctx, s.c.B().Del().Key(keys...).Build()).Error() + } + return +} + +func (s *appCache) Exist(key string) bool { + cmd := s.c.B().Exists().Key(key).Build() + count, _ := s.c.Do(context.Background(), cmd).AsInt64() + return count > 0 +} + +func (s *webCache) GetUnreadMsgCountResp(uid int64) ([]byte, error) { + key := conf.KeyUnreadMsg.Get(uid) + return s.Get(key) +} + +func (s *webCache) PutUnreadMsgCountResp(uid int64, data []byte) error { + return s.Set(conf.KeyUnreadMsg.Get(uid), data, s.unreadMsgExpire) +} + +func (s *webCache) DelUnreadMsgCountResp(uid int64) error { + return s.Delete(conf.KeyUnreadMsg.Get(uid)) +} + +func (s *webCache) ExistUnreadMsgCountResp(uid int64) bool { + return s.Exist(conf.KeyUnreadMsg.Get(uid)) +} + +func newAppCache() *appCache { + return &appCache{ + cscExpire: conf.CacheSetting.CientSideCacheExpire, + c: conf.MustRedisClient(), + } } -func newWebCache() *redisWebCache { - s := conf.CacheSetting - return &redisWebCache{ - cscExpire: s.CientSideCacheExpire, - unreadMsgExpire: s.UnreadMsgExpire, - c: conf.MustRedisClient(), +func newWebCache(ac core.AppCache) *webCache { + return &webCache{ + AppCache: ac, + unreadMsgExpire: conf.CacheSetting.UnreadMsgExpire, } } diff --git a/internal/model/web/loose.go b/internal/model/web/loose.go index 8ab19584..cba3e8ab 100644 --- a/internal/model/web/loose.go +++ b/internal/model/web/loose.go @@ -5,10 +5,14 @@ package web import ( + "encoding/json" + "net/http" + "github.com/alimy/mir/v4" "github.com/gin-gonic/gin" "github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/core/cs" + "github.com/rocboss/paopao-ce/internal/model/joint" "github.com/rocboss/paopao-ce/internal/servants/base" "github.com/rocboss/paopao-ce/pkg/app" ) @@ -59,7 +63,10 @@ type GetUserTweetsReq struct { PageSize int `form:"-" binding:"-"` } -type GetUserTweetsResp base.PageResp +type GetUserTweetsResp struct { + Data *base.PageResp + JsonResp json.RawMessage +} type GetUserProfileReq struct { BaseInfo `form:"-" binding:"-"` @@ -111,3 +118,15 @@ func (r *TimelineReq) Bind(c *gin.Context) mir.Error { r.Query, r.Type = c.Query("query"), "search" return nil } + +func (r *GetUserTweetsResp) Render(c *gin.Context) { + if len(r.JsonResp) != 0 { + c.JSON(http.StatusOK, r.JsonResp) + } else { + c.JSON(http.StatusOK, &joint.JsonResp{ + Code: 0, + Msg: "success", + Data: r.Data, + }) + } +} diff --git a/internal/servants/base/events.go b/internal/servants/base/events.go index c5e7d08e..545d3399 100644 --- a/internal/servants/base/events.go +++ b/internal/servants/base/events.go @@ -5,10 +5,36 @@ package base import ( + "fmt" + "github.com/alimy/tryst/event" + "github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/core/ms" + "github.com/rocboss/paopao-ce/internal/events" + "github.com/rocboss/paopao-ce/internal/model/joint" + "github.com/rocboss/paopao-ce/pkg/json" ) +type CacheRespEvent struct { + event.UnimplementedEvent + ac core.AppCache + key string + data any + expire int64 +} + +type ExpireRespEvent struct { + event.UnimplementedEvent + ac core.AppCache + keys []string +} + +type ExpireAnyRespEvent struct { + event.UnimplementedEvent + ac core.AppCache + pattern string +} + type pushPostToSearchEvent struct { event.UnimplementedEvent fn func(*ms.Post) @@ -20,6 +46,71 @@ type pushAllPostToSearchEvent struct { fn func() error } +func OnCacheRespEvent(ac core.AppCache, key string, data any, expire int64) { + events.OnEvent(&CacheRespEvent{ + ac: ac, + key: key, + data: data, + expire: expire, + }) +} + +func OnExpireRespEvent(ac core.AppCache, keys ...string) { + if len(keys) != 0 { + events.OnEvent(&ExpireRespEvent{ + ac: ac, + keys: keys, + }) + } +} + +func OnExpireAnyRespEvent(ac core.AppCache, pattern string) { + events.OnEvent(&ExpireAnyRespEvent{ + ac: ac, + pattern: pattern, + }) +} + +func (p *CacheRespEvent) Name() string { + return "servants.base.CacheRespEvent" +} + +func (p *CacheRespEvent) Action() error { + if p.ac.Exist(p.key) { + // do nothing + return nil + } + resp := &joint.JsonResp{ + Code: 0, + Msg: "success", + Data: p.data, + } + data, err := json.Marshal(resp) + if err != nil { + return fmt.Errorf("CacheRespEvent action marshal resp occurs error: %w", err) + } + if err = p.ac.Set(p.key, data, p.expire); err != nil { + return fmt.Errorf("CacheRespEvent action put resp data to redis cache occurs error: %w", err) + } + return nil +} + +func (p *ExpireRespEvent) Name() string { + return "servants.base.ExpireRespEvent" +} + +func (p *ExpireRespEvent) Action() (err error) { + return p.ac.Delete(p.keys...) +} + +func (p *ExpireAnyRespEvent) Name() string { + return "servants.base.ExpireAnyRespEvent" +} + +func (p *ExpireAnyRespEvent) Action() (err error) { + return p.ac.DelAny(p.pattern) +} + func (p *pushPostToSearchEvent) Name() string { return "servants.base.pushPostToSearchEvent" } diff --git a/internal/servants/web/events.go b/internal/servants/web/events.go index f5b354df..f3879d5e 100644 --- a/internal/servants/web/events.go +++ b/internal/servants/web/events.go @@ -51,6 +51,10 @@ func (e *cacheUnreadMsgEvent) Name() string { } func (e *cacheUnreadMsgEvent) Action() error { + if e.wc.ExistUnreadMsgCountResp(e.uid) { + // do nothing + return nil + } count, err := e.ds.GetUnreadCount(e.uid) if err != nil { return fmt.Errorf("cacheUnreadMsgEvent action occurs error: %w", err) diff --git a/internal/servants/web/loose.go b/internal/servants/web/loose.go index 642f445e..f08282d9 100644 --- a/internal/servants/web/loose.go +++ b/internal/servants/web/loose.go @@ -5,9 +5,13 @@ package web import ( + "fmt" + "github.com/alimy/mir/v4" + "github.com/alimy/tryst/lets" "github.com/gin-gonic/gin" api "github.com/rocboss/paopao-ce/auto/api/v1" + "github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/core/cs" "github.com/rocboss/paopao-ce/internal/core/ms" @@ -25,6 +29,9 @@ var ( type looseSrv struct { api.UnimplementedLooseServant *base.DaoServant + ac core.AppCache + userTweetsExpire int64 + prefixUserTweets string } func (s *looseSrv) Chain() gin.HandlersChain { @@ -66,6 +73,13 @@ func (s *looseSrv) GetUserTweets(req *web.GetUserTweetsReq) (res *web.GetUserTwe if xerr != nil { return nil, err } + // 尝试直接从缓存中获取数据 + key, ok := "", false + if res, key, ok = s.userTweetsFromCache(req, user); ok { + // logrus.Debugf("GetUserTweets from cache key:%s", key) + return + } + // 缓存获取未成功,只能查库了 switch req.Style { case web.UserPostsStyleComment, web.UserPostsStyleMedia: res, err = s.listUserTweets(req, user) @@ -78,6 +92,24 @@ func (s *looseSrv) GetUserTweets(req *web.GetUserTweetsReq) (res *web.GetUserTwe default: res, err = s.getUserPostTweets(req, user, false) } + // 缓存处理 + if err == nil { + base.OnCacheRespEvent(s.ac, key, res.Data, s.userTweetsExpire) + } + return +} + +func (s *looseSrv) userTweetsFromCache(req *web.GetUserTweetsReq, user *cs.VistUser) (res *web.GetUserTweetsResp, key string, ok bool) { + switch req.Style { + case web.UserPostsStylePost, web.UserPostsStyleHighlight, web.UserPostsStyleMedia: + key = fmt.Sprintf("%s%s:%s:%s:%d:%d", s.prefixUserTweets, req.Username, req.Style, user.RelTyp, req.Page, req.PageSize) + default: + visitUserName := lets.If(user.RelTyp != cs.RelationGuest, user.Username, "_") + key = fmt.Sprintf("%s%s:%s:%s:%d:%d", s.prefixUserTweets, req.Username, req.Style, visitUserName, req.Page, req.PageSize) + } + if data, err := s.ac.Get(key); err == nil { + ok, res = true, &web.GetUserTweetsResp{JsonResp: data} + } return } @@ -99,7 +131,7 @@ func (s *looseSrv) getUserStarTweets(req *web.GetUserTweetsReq, user *cs.VistUse return nil, web.ErrGetStarsFailed } resp := base.PageRespFrom(postsFormated, req.Page, req.PageSize, totalRows) - return (*web.GetUserTweetsResp)(resp), nil + return &web.GetUserTweetsResp{Data: resp}, nil } func (s *looseSrv) listUserTweets(req *web.GetUserTweetsReq, user *cs.VistUser) (*web.GetUserTweetsResp, mir.Error) { @@ -126,7 +158,7 @@ func (s *looseSrv) listUserTweets(req *web.GetUserTweetsReq, user *cs.VistUser) return nil, web.ErrGetPostsFailed } resp := base.PageRespFrom(postFormated, req.Page, req.PageSize, total) - return (*web.GetUserTweetsResp)(resp), nil + return &web.GetUserTweetsResp{Data: resp}, nil } func (s *looseSrv) getUserPostTweets(req *web.GetUserTweetsReq, user *cs.VistUser, isHighlight bool) (*web.GetUserTweetsResp, mir.Error) { @@ -160,7 +192,7 @@ func (s *looseSrv) getUserPostTweets(req *web.GetUserTweetsReq, user *cs.VistUse return nil, web.ErrGetPostsFailed } resp := base.PageRespFrom(posts, req.Page, req.PageSize, totalRows) - return (*web.GetUserTweetsResp)(resp), nil + return &web.GetUserTweetsResp{Data: resp}, nil } func (s *looseSrv) GetUserProfile(req *web.GetUserProfileReq) (*web.GetUserProfileResp, mir.Error) { @@ -328,8 +360,11 @@ func (s *looseSrv) TweetComments(req *web.TweetCommentsReq) (*web.TweetCommentsR return (*web.TweetCommentsResp)(resp), nil } -func newLooseSrv(s *base.DaoServant) api.Loose { +func newLooseSrv(s *base.DaoServant, ac core.AppCache) api.Loose { return &looseSrv{ - DaoServant: s, + DaoServant: s, + ac: ac, + userTweetsExpire: conf.CacheSetting.UserTweetsExpire, + prefixUserTweets: conf.PrefixUserTweets, } } diff --git a/internal/servants/web/relax.go b/internal/servants/web/relax.go index f2fee368..7b6dd71e 100644 --- a/internal/servants/web/relax.go +++ b/internal/servants/web/relax.go @@ -7,6 +7,7 @@ package web import ( "github.com/alimy/mir/v4" "github.com/gin-gonic/gin" + "github.com/redis/rueidis" api "github.com/rocboss/paopao-ce/auto/api/v1" "github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/model/web" @@ -35,7 +36,7 @@ func (s *relaxSrv) GetUnreadMsgCount(req *web.GetUnreadMsgCountReq) (*web.GetUnr return &web.GetUnreadMsgCountResp{ JsonResp: data, }, nil - } else { + } else if !rueidis.IsRedisNil(xerr) { logrus.Warnf("GetUnreadMsgCount from cache occurs error: %s", xerr) } // 使用缓存机制特殊处理 diff --git a/internal/servants/web/web.go b/internal/servants/web/web.go index 9d8fe081..14ae8bb1 100644 --- a/internal/servants/web/web.go +++ b/internal/servants/web/web.go @@ -21,6 +21,7 @@ var ( _enablePhoneVerify bool _disallowUserRegister bool _ds core.DataService + _ac core.AppCache _wc core.WebCache _oss core.ObjectStorageService _onceInitial sync.Once @@ -34,7 +35,7 @@ func RouteWeb(e *gin.Engine) { api.RegisterAdminServant(e, newAdminSrv(ds)) api.RegisterCoreServant(e, newCoreSrv(ds, _oss, _wc)) api.RegisterRelaxServant(e, newRelaxSrv(ds, _wc)) - api.RegisterLooseServant(e, newLooseSrv(ds)) + api.RegisterLooseServant(e, newLooseSrv(ds, _ac)) api.RegisterPrivServant(e, newPrivSrv(ds, _oss)) api.RegisterPubServant(e, newPubSrv(ds)) api.RegisterFollowshipServant(e, newFollowshipSrv(ds)) @@ -54,6 +55,7 @@ func lazyInitial() { _disallowUserRegister = cfg.If("Web:DisallowUserRegister") _oss = dao.ObjectStorageService() _ds = dao.DataService() + _ac = cache.NewAppCache() _wc = cache.NewWebCache() }) }