From 4172474330011bcf37dd83090f512d3c4f21f31a Mon Sep 17 00:00:00 2001 From: Michael Li Date: Mon, 11 Sep 2023 22:31:59 +0800 Subject: [PATCH] add base app common cache logic for web response that will used for user/profile tweet list --- internal/core/cache.go | 8 +++ internal/dao/cache/cache.go | 8 ++- internal/dao/cache/web.go | 91 +++++++++++++++++++++++--------- internal/servants/base/events.go | 87 ++++++++++++++++++++++++++++++ 4 files changed, 168 insertions(+), 26 deletions(-) diff --git a/internal/core/cache.go b/internal/core/cache.go index 2a0db675..5d30833e 100644 --- a/internal/core/cache.go +++ b/internal/core/cache.go @@ -98,7 +98,15 @@ type RedisCache interface { DelRechargeStatus(ctx context.Context, tradeNo string) error } +type AppCache interface { + Get(key string) ([]byte, error) + Set(key string, data []byte, ex int64) error + Delete(key ...string) error + DelAny(pattern string) error +} + type WebCache interface { + AppCache GetUnreadMsgCountResp(uid int64) ([]byte, error) PutUnreadMsgCountResp(uid int64, data []byte) error DelUnreadMsgCountResp(uid int64) error diff --git a/internal/dao/cache/cache.go b/internal/dao/cache/cache.go index 05f6fce8..2cee6684 100644 --- a/internal/dao/cache/cache.go +++ b/internal/dao/cache/cache.go @@ -58,6 +58,11 @@ func NewWebCache() core.WebCache { return _webCache } +func NewAppCache() core.AppCache { + lazyInitial() + return _appCache +} + func NewSimpleCacheIndexService(indexPosts core.IndexPostsService) (core.CacheIndexService, core.VersionInfo) { s := conf.SimpleCacheIndexSetting cacheIndex := &simpleCacheIndexServant{ @@ -101,6 +106,7 @@ func NewNoneCacheIndexService(indexPosts core.IndexPostsService) (core.CacheInde func lazyInitial() { _onceInit.Do(func() { - _webCache = newWebCache() + _appCache = newAppCache() + _webCache = newWebCache(_appCache) }) } diff --git a/internal/dao/cache/web.go b/internal/dao/cache/web.go index d911c5bc..8fe8ec8c 100644 --- a/internal/dao/cache/web.go +++ b/internal/dao/cache/web.go @@ -8,7 +8,6 @@ import ( "context" "time" - "github.com/Masterminds/semver/v3" "github.com/redis/rueidis" "github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/core" @@ -16,25 +15,21 @@ import ( ) var ( - _webCache core.WebCache = (*redisWebCache)(nil) + _webCache core.WebCache = (*webCache)(nil) + _appCache core.AppCache = (*appCache)(nil) ) -type redisWebCache struct { - cscExpire time.Duration - unreadMsgExpire int64 - c rueidis.Client -} - -func (s *redisWebCache) Name() string { - return "RedisWebCache" +type appCache struct { + cscExpire time.Duration + c rueidis.Client } -func (s *redisWebCache) Version() *semver.Version { - return semver.MustParse("v0.1.0") +type webCache struct { + core.AppCache + unreadMsgExpire int64 } -func (s *redisWebCache) GetUnreadMsgCountResp(uid int64) ([]byte, error) { - key := conf.KeyUnreadMsg.Get(uid) +func (s *appCache) Get(key string) ([]byte, error) { res, err := rueidis.MGetCache(s.c, context.Background(), s.cscExpire, []string{key}) if err != nil { return nil, err @@ -43,24 +38,70 @@ func (s *redisWebCache) GetUnreadMsgCountResp(uid int64) ([]byte, error) { return message.AsBytes() } -func (s *redisWebCache) PutUnreadMsgCountResp(uid int64, data []byte) error { +func (s *appCache) Set(key string, data []byte, ex int64) error { return s.c.Do(context.Background(), s.c.B().Set(). - Key(conf.KeyUnreadMsg.Get(uid)). + Key(key). Value(utils.String(data)). - ExSeconds(s.unreadMsgExpire). + ExSeconds(ex). Build()). Error() } -func (s *redisWebCache) DelUnreadMsgCountResp(uid int64) error { - return s.c.Do(context.Background(), s.c.B().Del().Key(conf.KeyUnreadMsg.Get(uid)).Build()).Error() +func (s *appCache) Delete(keys ...string) (err error) { + if len(keys) != 0 { + err = s.c.Do(context.Background(), s.c.B().Del().Key(keys...).Build()).Error() + } + return +} + +func (s *appCache) DelAny(pattern string) (err error) { + var ( + keys []string + cursor uint64 + entry rueidis.ScanEntry + ) + ctx := context.Background() + for { + cmd := s.c.B().Scan().Cursor(cursor).Match(pattern).Count(50).Build() + if entry, err = s.c.Do(ctx, cmd).AsScanEntry(); err != nil { + return + } + keys = append(keys, entry.Elements...) + if entry.Cursor != 0 { + cursor = entry.Cursor + continue + } + break + } + if len(keys) != 0 { + err = s.c.Do(ctx, s.c.B().Del().Key(keys...).Build()).Error() + } + return +} + +func (s *webCache) GetUnreadMsgCountResp(uid int64) ([]byte, error) { + key := conf.KeyUnreadMsg.Get(uid) + return s.Get(key) +} + +func (s *webCache) PutUnreadMsgCountResp(uid int64, data []byte) error { + return s.Set(conf.KeyUnreadMsg.Get(uid), data, s.unreadMsgExpire) +} + +func (s *webCache) DelUnreadMsgCountResp(uid int64) error { + return s.Delete(conf.KeyUnreadMsg.Get(uid)) +} + +func newAppCache() *appCache { + return &appCache{ + cscExpire: conf.CacheSetting.CientSideCacheExpire, + c: conf.MustRedisClient(), + } } -func newWebCache() *redisWebCache { - s := conf.CacheSetting - return &redisWebCache{ - cscExpire: s.CientSideCacheExpire, - unreadMsgExpire: s.UnreadMsgExpire, - c: conf.MustRedisClient(), +func newWebCache(ac core.AppCache) *webCache { + return &webCache{ + AppCache: ac, + unreadMsgExpire: conf.CacheSetting.UnreadMsgExpire, } } diff --git a/internal/servants/base/events.go b/internal/servants/base/events.go index c5e7d08e..92e0ee8e 100644 --- a/internal/servants/base/events.go +++ b/internal/servants/base/events.go @@ -5,10 +5,36 @@ package base import ( + "encoding/json" + "fmt" + "github.com/alimy/tryst/event" + "github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/core/ms" + "github.com/rocboss/paopao-ce/internal/events" + "github.com/rocboss/paopao-ce/internal/model/joint" ) +type CacheRespEvent struct { + event.UnimplementedEvent + ac core.AppCache + key string + data any + expire int64 +} + +type ExpireRespEvent struct { + event.UnimplementedEvent + ac core.AppCache + keys []string +} + +type ExpireAnyRespEvent struct { + event.UnimplementedEvent + ac core.AppCache + pattern string +} + type pushPostToSearchEvent struct { event.UnimplementedEvent fn func(*ms.Post) @@ -20,6 +46,67 @@ type pushAllPostToSearchEvent struct { fn func() error } +func OnCacheRespEvent(ac core.AppCache, key string, data any, expire int64) { + events.OnEvent(&CacheRespEvent{ + ac: ac, + key: key, + data: data, + expire: expire, + }) +} + +func OnExpireRespEvent(ac core.AppCache, keys ...string) { + if len(keys) != 0 { + events.OnEvent(&ExpireRespEvent{ + ac: ac, + keys: keys, + }) + } +} + +func OnExpireAnyRespEvent(ac core.AppCache, pattern string) { + events.OnEvent(&ExpireAnyRespEvent{ + ac: ac, + pattern: pattern, + }) +} + +func (p *CacheRespEvent) Name() string { + return "servants.base.CacheRespEvent" +} + +func (p *CacheRespEvent) Action() error { + resp := &joint.JsonResp{ + Code: 0, + Msg: "success", + Data: p.data, + } + data, err := json.Marshal(resp) + if err != nil { + return fmt.Errorf("CacheRespEvent action marshal resp occurs error: %w", err) + } + if err = p.ac.Set(p.key, data, p.expire); err != nil { + return fmt.Errorf("CacheRespEvent action put resp data to redis cache occurs error: %w", err) + } + return nil +} + +func (p *ExpireRespEvent) Name() string { + return "servants.base.ExpireRespEvent" +} + +func (p *ExpireRespEvent) Action() (err error) { + return p.ac.Delete(p.keys...) +} + +func (p *ExpireAnyRespEvent) Name() string { + return "servants.base.ExpireAnyRespEvent" +} + +func (p *ExpireAnyRespEvent) Action() (err error) { + return p.ac.DelAny(p.pattern) +} + func (p *pushPostToSearchEvent) Name() string { return "servants.base.pushPostToSearchEvent" }