diff --git a/config.yaml.sample b/config.yaml.sample index cd0c55d1..5aa7eb49 100644 --- a/config.yaml.sample +++ b/config.yaml.sample @@ -12,9 +12,11 @@ Server: # 服务设置 ReadTimeout: 60 WriteTimeout: 60 Features: - Default: ["Alipay", "Zinc", "MySQL", "Redis", "MinIO", "LoggerFile"] - Develop: ["Sms", "Zinc", "MySQL", "AliOSS", "LoggerZinc"] - Slim: ["Zinc", "MySQL", "Redis", "LocalOSS", "LoggerFile"] + Default: ["Base", "Option", "MinIO", "LoggerFile"] + Develop: ["Base", "Option", "Sms", "AliOSS", "LoggerZinc"] + Slim: ["Base", "LocalOSS", "LoggerFile"] + Base: ["Zinc", "MySQL", "Redis", "Alipay",] + Option: ["CacheIndex"] Sms: "SmsJuhe" SmsJuhe: Key: @@ -23,15 +25,21 @@ SmsJuhe: Alipay: AppID: PrivateKey: +CacheIndex: # 缓存泡泡广场消息流 + MaxIndexSize: 200 # 最大缓存条数 + CheckTickDuration: 60 # 循环自检查每多少秒一次 + ExpireTickDuration: 300 # 每多少秒后强制过期缓存 LoggerFile: # 使用File写日志 SavePath: data/paopao-ce/logs FileName: app FileExt: .log + Level: info LoggerZinc: # 使用Zinc写日志 Host: http://zinc:4080/es/_bulk Index: paopao-log User: admin Password: admin + Level: info JWT: # 鉴权加密 Secret: 18a6413dc4fe394c66345ebe501b2f26 Issuer: paopao-api diff --git a/internal/conf/conf.go b/internal/conf/conf.go index 9e54807a..4ba67b07 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -13,17 +13,18 @@ var ( redisSetting *RedisSettingS features *FeaturesSettingS - ServerSetting *ServerSettingS - AppSetting *AppSettingS - SmsJuheSetting *SmsJuheSettings - AlipaySetting *AlipaySettingS - ZincSetting *ZincSettingS - AliOSSSetting *AliOSSSettingS - MinIOSetting *MinIOSettingS - S3Setting *S3SettingS - LocalOSSSetting *LocalOSSSettingS - JWTSetting *JWTSettingS - Mutex *sync.Mutex + ServerSetting *ServerSettingS + AppSetting *AppSettingS + CacheIndexSetting *CacheIndexSettingS + SmsJuheSetting *SmsJuheSettings + AlipaySetting *AlipaySettingS + ZincSetting *ZincSettingS + AliOSSSetting *AliOSSSettingS + MinIOSetting *MinIOSettingS + S3Setting *S3SettingS + LocalOSSSetting *LocalOSSSettingS + JWTSetting *JWTSettingS + Mutex *sync.Mutex ) func setupSetting(suite []string, noDefault bool) error { @@ -42,6 +43,7 @@ func setupSetting(suite []string, noDefault bool) error { objects := map[string]interface{}{ "App": &AppSetting, "Server": &ServerSetting, + "CacheIndex": &CacheIndexSetting, "Alipay": &AlipaySetting, "SmsJuhe": &SmsJuheSetting, "LoggerFile": &loggerFileSetting, diff --git a/internal/conf/logger.go b/internal/conf/logger.go index e7f183fe..31724a6d 100644 --- a/internal/conf/logger.go +++ b/internal/conf/logger.go @@ -74,6 +74,9 @@ func setupLogger() { LocalTime: true, } logrus.SetOutput(out) + if level, err := logrus.ParseLevel(loggerFileSetting.Level); err == nil { + logrus.SetLevel(level) + } } else if CfgIf("LoggerZinc") { hook := &zincLogHook{ host: loggerZincSetting.Host, @@ -81,6 +84,9 @@ func setupLogger() { user: loggerZincSetting.User, password: loggerZincSetting.Password, } + if level, err := logrus.ParseLevel(loggerZincSetting.Level); err == nil { + logrus.SetLevel(level) + } logrus.SetOutput(io.Discard) logrus.AddHook(hook) } diff --git a/internal/conf/settting.go b/internal/conf/settting.go index 8b6f8fbc..3c572b30 100644 --- a/internal/conf/settting.go +++ b/internal/conf/settting.go @@ -16,6 +16,7 @@ type LoggerFileSettingS struct { SavePath string FileName string FileExt string + Level string } type LoggerZincSettingS struct { @@ -23,6 +24,7 @@ type LoggerZincSettingS struct { Index string User string Password string + Level string } type ServerSettingS struct { @@ -44,6 +46,12 @@ type AppSettingS struct { TronApiKeys []string } +type CacheIndexSettingS struct { + MaxIndexSize int + CheckTickDuration int + ExpireTickDuration int +} + type AlipaySettingS struct { AppID string PrivateKey string @@ -197,7 +205,6 @@ func (f *FeaturesSettingS) Use(suite []string, noDefault bool) error { } features := f.flatFeatures(suite) for _, feature := range features { - feature = strings.ToLower(feature) if len(feature) == 0 { continue } @@ -209,10 +216,13 @@ func (f *FeaturesSettingS) Use(suite []string, noDefault bool) error { func (f *FeaturesSettingS) flatFeatures(suite []string) []string { features := make([]string, 0, len(suite)+10) for s := suite[:]; len(s) > 0; s = s[:len(s)-1] { - if items, exist := f.suites[s[0]]; exist { - s = append(s, items...) + item := strings.TrimSpace(strings.ToLower(s[0])) + if len(item) > 0 { + if items, exist := f.suites[item]; exist { + s = append(s, items...) + } + features = append(features, item) } - features = append(features, s[0]) s[0] = s[len(s)-1] } return features diff --git a/internal/core/cache.go b/internal/core/cache.go index a092f182..25394957 100644 --- a/internal/core/cache.go +++ b/internal/core/cache.go @@ -1,6 +1,38 @@ package core -// CacheService cache service interface that implement base Redis or other -type CacheService interface { - // TODO +import ( + "github.com/rocboss/paopao-ce/internal/model" +) + +const ( + IdxActNop IndexActionT = iota + 1 + IdxActCreatePost + IdxActUpdatePost + IdxActDeletePost + IdxActStickPost +) + +type IndexActionT uint8 + +func (a IndexActionT) String() string { + switch a { + case IdxActNop: + return "no operator" + case IdxActCreatePost: + return "create post" + case IdxActUpdatePost: + return "update post" + case IdxActDeletePost: + return "delete post" + case IdxActStickPost: + return "stick post" + default: + return "unknow action" + } +} + +// CacheIndexService cache index service interface +type CacheIndexService interface { + IndexPosts(offset int, limit int) ([]*model.PostFormated, bool) + SendAction(active IndexActionT) } diff --git a/internal/core/core.go b/internal/core/core.go index 7a0b426f..cdd1a92b 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -34,6 +34,7 @@ type DataService interface { LockPost(post *model.Post) error StickPost(post *model.Post) error GetPostByID(id int64) (*model.Post, error) + IndexPosts(offset int, limit int) ([]*model.PostFormated, error) GetPosts(conditions *model.ConditionsT, offset, limit int) ([]*model.Post, error) GetPostCount(conditions *model.ConditionsT) (int64, error) UpdatePost(post *model.Post) error diff --git a/internal/dao/cache_index.go b/internal/dao/cache_index.go new file mode 100644 index 00000000..c457987d --- /dev/null +++ b/internal/dao/cache_index.go @@ -0,0 +1,86 @@ +package dao + +import ( + "time" + + "github.com/rocboss/paopao-ce/internal/conf" + "github.com/rocboss/paopao-ce/internal/core" + "github.com/rocboss/paopao-ce/internal/model" + "github.com/sirupsen/logrus" +) + +func newSimpleCacheIndexServant(getIndexPosts func(offset, limit int) ([]*model.PostFormated, error)) *simpleCacheIndexServant { + s := conf.CacheIndexSetting + cacheIndex := &simpleCacheIndexServant{ + getIndexPosts: getIndexPosts, + maxIndexSize: s.MaxIndexSize, + indexPosts: make([]*model.PostFormated, 0), + indexActionCh: make(chan core.IndexActionT, 100), // optimize: size need configure by custom + checkTick: time.NewTicker(time.Duration(s.CheckTickDuration) * time.Second), // check whether need update index every 1 minute + expireIndexTick: time.NewTicker(time.Duration(s.ExpireTickDuration) * time.Second), // force expire index every 5 minute + } + + // start index posts + cacheIndex.atomicIndex.Store(cacheIndex.indexPosts) + go cacheIndex.startIndexPosts() + + return cacheIndex +} + +func (s *simpleCacheIndexServant) IndexPosts(offset int, limit int) ([]*model.PostFormated, bool) { + posts := s.atomicIndex.Load().([]*model.PostFormated) + start := offset * limit + end := start + limit + if len(posts) >= end { + logrus.Debugln("get index posts from cached") + return posts[start:end], true + } + return nil, false +} + +func (s *simpleCacheIndexServant) SendAction(act core.IndexActionT) { + select { + case s.indexActionCh <- act: + logrus.Debugf("send indexAction by chan: %s", act) + default: + go func(ch chan<- core.IndexActionT, act core.IndexActionT) { + logrus.Debugf("send indexAction by goroutine: %s", act) + ch <- act + }(s.indexActionCh, act) + } +} + +func (s *simpleCacheIndexServant) startIndexPosts() { + var err error + for { + select { + case <-s.checkTick.C: + if len(s.indexPosts) == 0 { + logrus.Debugf("index posts by checkTick") + if s.indexPosts, err = s.getIndexPosts(0, s.maxIndexSize); err == nil { + s.atomicIndex.Store(s.indexPosts) + } else { + logrus.Errorf("get index posts err: %v", err) + } + } + case <-s.expireIndexTick.C: + logrus.Debugf("expire index posts by expireIndexTick") + if len(s.indexPosts) != 0 { + s.indexPosts = nil + s.atomicIndex.Store(s.indexPosts) + } + case action := <-s.indexActionCh: + switch action { + case core.IdxActCreatePost, core.IdxActUpdatePost, core.IdxActDeletePost, core.IdxActStickPost: + // prevent many update post in least time + if len(s.indexPosts) != 0 { + logrus.Debugf("remove index posts by action %s", action) + s.indexPosts = nil + s.atomicIndex.Store(s.indexPosts) + } + default: + // nop + } + } + } +} diff --git a/internal/dao/dao.go b/internal/dao/dao.go index e2ead4d7..df97701b 100644 --- a/internal/dao/dao.go +++ b/internal/dao/dao.go @@ -1,10 +1,14 @@ package dao import ( + "sync/atomic" + "time" + "github.com/aliyun/aliyun-oss-go-sdk/oss" "github.com/minio/minio-go/v7" "github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/core" + "github.com/rocboss/paopao-ce/internal/model" "github.com/rocboss/paopao-ce/pkg/zinc" "github.com/sirupsen/logrus" "gorm.io/gorm" @@ -17,13 +21,27 @@ var ( _ core.ObjectStorageService = (*s3Servant)(nil) _ core.ObjectStorageService = (*localossServant)(nil) _ core.AttachmentCheckService = (*attachmentCheckServant)(nil) + _ core.CacheIndexService = (*simpleCacheIndexServant)(nil) ) type dataServant struct { + useCacheIndex bool + cacheIndex core.CacheIndexService + engine *gorm.DB zinc *zinc.ZincClient } +type simpleCacheIndexServant struct { + getIndexPosts func(offset, limit int) ([]*model.PostFormated, error) + indexActionCh chan core.IndexActionT + indexPosts []*model.PostFormated + atomicIndex atomic.Value + maxIndexSize int + checkTick *time.Ticker + expireIndexTick *time.Ticker +} + type localossServant struct { savePath string domain string @@ -47,10 +65,20 @@ type attachmentCheckServant struct { } func NewDataService(engine *gorm.DB, zinc *zinc.ZincClient) core.DataService { - return &dataServant{ + ds := &dataServant{ engine: engine, zinc: zinc, } + + // initialize CacheIndex if needed + if !conf.CfgIf("CacheIndex") { + ds.useCacheIndex = false + } else { + ds.useCacheIndex = true + ds.cacheIndex = newSimpleCacheIndexServant(ds.getIndexPosts) + } + + return ds } func NewObjectStorageService() (oss core.ObjectStorageService) { diff --git a/internal/dao/post.go b/internal/dao/post.go index 9773a61c..292e16f9 100644 --- a/internal/dao/post.go +++ b/internal/dao/post.go @@ -3,16 +3,26 @@ package dao import ( "time" + "github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/model" ) func (d *dataServant) CreatePost(post *model.Post) (*model.Post, error) { post.LatestRepliedOn = time.Now().Unix() - return post.Create(d.engine) + p, err := post.Create(d.engine) + if err != nil { + return nil, err + } + d.indexActive(core.IdxActCreatePost) + return p, nil } func (d *dataServant) DeletePost(post *model.Post) error { - return post.Delete(d.engine) + if err := post.Delete(d.engine); err != nil { + return err + } + d.indexActive(core.IdxActDeletePost) + return nil } func (d *dataServant) LockPost(post *model.Post) error { @@ -22,7 +32,11 @@ func (d *dataServant) LockPost(post *model.Post) error { func (d *dataServant) StickPost(post *model.Post) error { post.IsTop = 1 - post.IsTop - return post.Update(d.engine) + if err := post.Update(d.engine); err != nil { + return err + } + d.indexActive(core.IdxActStickPost) + return nil } func (d *dataServant) GetPostByID(id int64) (*model.Post, error) { @@ -43,7 +57,11 @@ func (d *dataServant) GetPostCount(conditions *model.ConditionsT) (int64, error) } func (d *dataServant) UpdatePost(post *model.Post) error { - return post.Update(d.engine) + if err := post.Update(d.engine); err != nil { + return err + } + d.indexActive(core.IdxActUpdatePost) + return nil } func (d *dataServant) GetUserPostStar(postID, userID int64) (*model.PostStar, error) { diff --git a/internal/dao/post_index.go b/internal/dao/post_index.go new file mode 100644 index 00000000..53a0913d --- /dev/null +++ b/internal/dao/post_index.go @@ -0,0 +1,74 @@ +package dao + +import ( + "github.com/rocboss/paopao-ce/internal/core" + "github.com/rocboss/paopao-ce/internal/model" + "github.com/sirupsen/logrus" +) + +func (d *dataServant) IndexPosts(offset int, limit int) ([]*model.PostFormated, error) { + if d.useCacheIndex { + if posts, ok := d.cacheIndex.IndexPosts(offset, limit); ok { + logrus.Debugln("get index posts from cached") + return posts, nil + } + } + logrus.Debugf("get index posts from database but useCacheIndex: %t", d.useCacheIndex) + return d.getIndexPosts(offset, limit) +} + +func (d *dataServant) MergePosts(posts []*model.Post) ([]*model.PostFormated, error) { + postIds := make([]int64, 0, len(posts)) + userIds := make([]int64, 0, len(posts)) + for _, post := range posts { + postIds = append(postIds, post.ID) + userIds = append(userIds, post.UserID) + } + + postContents, err := d.GetPostContentsByIDs(postIds) + if err != nil { + return nil, err + } + + users, err := d.GetUsersByIDs(userIds) + if err != nil { + return nil, err + } + + userMap := make(map[int64]*model.UserFormated, len(users)) + for _, user := range users { + userMap[user.ID] = user.Format() + } + + contentMap := make(map[int64][]*model.PostContentFormated, len(postContents)) + for _, content := range postContents { + contentMap[content.PostID] = append(contentMap[content.PostID], content.Format()) + } + + // 数据整合 + postsFormated := make([]*model.PostFormated, 0, len(posts)) + for _, post := range posts { + postFormated := post.Format() + postFormated.User = userMap[post.UserID] + postFormated.Contents = contentMap[post.ID] + postsFormated = append(postsFormated, postFormated) + } + return postsFormated, nil +} + +func (d *dataServant) getIndexPosts(offset int, limit int) ([]*model.PostFormated, error) { + posts, err := (&model.Post{}).List(d.engine, &model.ConditionsT{ + "ORDER": "is_top DESC, latest_replied_on DESC", + }, offset, limit) + if err != nil { + logrus.Debugf("getIndexPosts err: %v", err) + return nil, err + } + return d.MergePosts(posts) +} + +func (d *dataServant) indexActive(act core.IndexActionT) { + if d.useCacheIndex { + d.cacheIndex.SendAction(act) + } +} diff --git a/internal/routers/api/post.go b/internal/routers/api/post.go index b595a482..8b4f3703 100644 --- a/internal/routers/api/post.go +++ b/internal/routers/api/post.go @@ -23,14 +23,8 @@ func GetPostList(c *gin.Context) { } if q.Query == "" && q.Type == "search" { - // 直接读库 - posts, err := service.GetPostList(&service.PostListReq{ - Conditions: &model.ConditionsT{ - "ORDER": "is_top DESC, latest_replied_on DESC", - }, - Offset: (app.GetPage(c) - 1) * app.GetPageSize(c), - Limit: app.GetPageSize(c), - }) + offset, limit := app.GetPageOffset(c) + posts, err := service.GetIndexPosts(offset, limit) if err != nil { logrus.Errorf("service.GetPostList err: %v\n", err) response.ToErrorResponse(errcode.GetPostsFailed) diff --git a/internal/service/post.go b/internal/service/post.go index f692fb88..00bc494c 100644 --- a/internal/service/post.go +++ b/internal/service/post.go @@ -324,6 +324,10 @@ func GetPostContentByID(id int64) (*model.PostContent, error) { return ds.GetPostContentByID(id) } +func GetIndexPosts(offset int, limit int) ([]*model.PostFormated, error) { + return ds.IndexPosts(offset, limit) +} + func GetPostList(req *PostListReq) ([]*model.PostFormated, error) { posts, err := ds.GetPosts(req.Conditions, req.Offset, req.Limit) diff --git a/pkg/app/pagination.go b/pkg/app/pagination.go index 332e25c6..1888efb9 100644 --- a/pkg/app/pagination.go +++ b/pkg/app/pagination.go @@ -27,11 +27,18 @@ func GetPageSize(c *gin.Context) int { return pageSize } -func GetPageOffset(page, pageSize int) int { - result := 0 - if page > 0 { - result = (page - 1) * pageSize +func GetPageOffset(c *gin.Context) (offset, limit int) { + page := convert.StrTo(c.Query("page")).MustInt() + if page <= 0 { + page = 1 } - return result + limit = convert.StrTo(c.Query("page_size")).MustInt() + if limit <= 0 { + limit = conf.AppSetting.DefaultPageSize + } else if limit > conf.AppSetting.MaxPageSize { + limit = conf.AppSetting.MaxPageSize + } + offset = (page - 1) * limit + return }