Merge pull request #78 from alimy/pr-cache

optimize add cache first number page of post in custom configure
pull/80/head
Michael Li 2 years ago committed by GitHub
commit 70e1c58bb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -12,9 +12,11 @@ Server: # 服务设置
ReadTimeout: 60
WriteTimeout: 60
Features:
Default: ["Alipay", "Zinc", "MySQL", "Redis", "MinIO", "LoggerFile"]
Develop: ["Sms", "Zinc", "MySQL", "AliOSS", "LoggerZinc"]
Slim: ["Zinc", "MySQL", "Redis", "LocalOSS", "LoggerFile"]
Default: ["Base", "Option", "MinIO", "LoggerFile"]
Develop: ["Base", "Option", "Sms", "AliOSS", "LoggerZinc"]
Slim: ["Base", "LocalOSS", "LoggerFile"]
Base: ["Zinc", "MySQL", "Redis", "Alipay",]
Option: ["CacheIndex"]
Sms: "SmsJuhe"
SmsJuhe:
Key:
@ -23,15 +25,21 @@ SmsJuhe:
Alipay:
AppID:
PrivateKey:
CacheIndex: # 缓存泡泡广场消息流
MaxIndexSize: 200 # 最大缓存条数
CheckTickDuration: 60 # 循环自检查每多少秒一次
ExpireTickDuration: 300 # 每多少秒后强制过期缓存
LoggerFile: # 使用File写日志
SavePath: data/paopao-ce/logs
FileName: app
FileExt: .log
Level: info
LoggerZinc: # 使用Zinc写日志
Host: http://zinc:4080/es/_bulk
Index: paopao-log
User: admin
Password: admin
Level: info
JWT: # 鉴权加密
Secret: 18a6413dc4fe394c66345ebe501b2f26
Issuer: paopao-api

@ -13,17 +13,18 @@ var (
redisSetting *RedisSettingS
features *FeaturesSettingS
ServerSetting *ServerSettingS
AppSetting *AppSettingS
SmsJuheSetting *SmsJuheSettings
AlipaySetting *AlipaySettingS
ZincSetting *ZincSettingS
AliOSSSetting *AliOSSSettingS
MinIOSetting *MinIOSettingS
S3Setting *S3SettingS
LocalOSSSetting *LocalOSSSettingS
JWTSetting *JWTSettingS
Mutex *sync.Mutex
ServerSetting *ServerSettingS
AppSetting *AppSettingS
CacheIndexSetting *CacheIndexSettingS
SmsJuheSetting *SmsJuheSettings
AlipaySetting *AlipaySettingS
ZincSetting *ZincSettingS
AliOSSSetting *AliOSSSettingS
MinIOSetting *MinIOSettingS
S3Setting *S3SettingS
LocalOSSSetting *LocalOSSSettingS
JWTSetting *JWTSettingS
Mutex *sync.Mutex
)
func setupSetting(suite []string, noDefault bool) error {
@ -42,6 +43,7 @@ func setupSetting(suite []string, noDefault bool) error {
objects := map[string]interface{}{
"App": &AppSetting,
"Server": &ServerSetting,
"CacheIndex": &CacheIndexSetting,
"Alipay": &AlipaySetting,
"SmsJuhe": &SmsJuheSetting,
"LoggerFile": &loggerFileSetting,

@ -74,6 +74,9 @@ func setupLogger() {
LocalTime: true,
}
logrus.SetOutput(out)
if level, err := logrus.ParseLevel(loggerFileSetting.Level); err == nil {
logrus.SetLevel(level)
}
} else if CfgIf("LoggerZinc") {
hook := &zincLogHook{
host: loggerZincSetting.Host,
@ -81,6 +84,9 @@ func setupLogger() {
user: loggerZincSetting.User,
password: loggerZincSetting.Password,
}
if level, err := logrus.ParseLevel(loggerZincSetting.Level); err == nil {
logrus.SetLevel(level)
}
logrus.SetOutput(io.Discard)
logrus.AddHook(hook)
}

@ -16,6 +16,7 @@ type LoggerFileSettingS struct {
SavePath string
FileName string
FileExt string
Level string
}
type LoggerZincSettingS struct {
@ -23,6 +24,7 @@ type LoggerZincSettingS struct {
Index string
User string
Password string
Level string
}
type ServerSettingS struct {
@ -44,6 +46,12 @@ type AppSettingS struct {
TronApiKeys []string
}
type CacheIndexSettingS struct {
MaxIndexSize int
CheckTickDuration int
ExpireTickDuration int
}
type AlipaySettingS struct {
AppID string
PrivateKey string
@ -197,7 +205,6 @@ func (f *FeaturesSettingS) Use(suite []string, noDefault bool) error {
}
features := f.flatFeatures(suite)
for _, feature := range features {
feature = strings.ToLower(feature)
if len(feature) == 0 {
continue
}
@ -209,10 +216,13 @@ func (f *FeaturesSettingS) Use(suite []string, noDefault bool) error {
func (f *FeaturesSettingS) flatFeatures(suite []string) []string {
features := make([]string, 0, len(suite)+10)
for s := suite[:]; len(s) > 0; s = s[:len(s)-1] {
if items, exist := f.suites[s[0]]; exist {
s = append(s, items...)
item := strings.TrimSpace(strings.ToLower(s[0]))
if len(item) > 0 {
if items, exist := f.suites[item]; exist {
s = append(s, items...)
}
features = append(features, item)
}
features = append(features, s[0])
s[0] = s[len(s)-1]
}
return features

@ -1,6 +1,38 @@
package core
// CacheService cache service interface that implement base Redis or other
type CacheService interface {
// TODO
import (
"github.com/rocboss/paopao-ce/internal/model"
)
const (
IdxActNop IndexActionT = iota + 1
IdxActCreatePost
IdxActUpdatePost
IdxActDeletePost
IdxActStickPost
)
type IndexActionT uint8
func (a IndexActionT) String() string {
switch a {
case IdxActNop:
return "no operator"
case IdxActCreatePost:
return "create post"
case IdxActUpdatePost:
return "update post"
case IdxActDeletePost:
return "delete post"
case IdxActStickPost:
return "stick post"
default:
return "unknow action"
}
}
// CacheIndexService cache index service interface
type CacheIndexService interface {
IndexPosts(offset int, limit int) ([]*model.PostFormated, bool)
SendAction(active IndexActionT)
}

@ -34,6 +34,7 @@ type DataService interface {
LockPost(post *model.Post) error
StickPost(post *model.Post) error
GetPostByID(id int64) (*model.Post, error)
IndexPosts(offset int, limit int) ([]*model.PostFormated, error)
GetPosts(conditions *model.ConditionsT, offset, limit int) ([]*model.Post, error)
GetPostCount(conditions *model.ConditionsT) (int64, error)
UpdatePost(post *model.Post) error

@ -0,0 +1,86 @@
package dao
import (
"time"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/sirupsen/logrus"
)
func newSimpleCacheIndexServant(getIndexPosts func(offset, limit int) ([]*model.PostFormated, error)) *simpleCacheIndexServant {
s := conf.CacheIndexSetting
cacheIndex := &simpleCacheIndexServant{
getIndexPosts: getIndexPosts,
maxIndexSize: s.MaxIndexSize,
indexPosts: make([]*model.PostFormated, 0),
indexActionCh: make(chan core.IndexActionT, 100), // optimize: size need configure by custom
checkTick: time.NewTicker(time.Duration(s.CheckTickDuration) * time.Second), // check whether need update index every 1 minute
expireIndexTick: time.NewTicker(time.Duration(s.ExpireTickDuration) * time.Second), // force expire index every 5 minute
}
// start index posts
cacheIndex.atomicIndex.Store(cacheIndex.indexPosts)
go cacheIndex.startIndexPosts()
return cacheIndex
}
func (s *simpleCacheIndexServant) IndexPosts(offset int, limit int) ([]*model.PostFormated, bool) {
posts := s.atomicIndex.Load().([]*model.PostFormated)
start := offset * limit
end := start + limit
if len(posts) >= end {
logrus.Debugln("get index posts from cached")
return posts[start:end], true
}
return nil, false
}
func (s *simpleCacheIndexServant) SendAction(act core.IndexActionT) {
select {
case s.indexActionCh <- act:
logrus.Debugf("send indexAction by chan: %s", act)
default:
go func(ch chan<- core.IndexActionT, act core.IndexActionT) {
logrus.Debugf("send indexAction by goroutine: %s", act)
ch <- act
}(s.indexActionCh, act)
}
}
func (s *simpleCacheIndexServant) startIndexPosts() {
var err error
for {
select {
case <-s.checkTick.C:
if len(s.indexPosts) == 0 {
logrus.Debugf("index posts by checkTick")
if s.indexPosts, err = s.getIndexPosts(0, s.maxIndexSize); err == nil {
s.atomicIndex.Store(s.indexPosts)
} else {
logrus.Errorf("get index posts err: %v", err)
}
}
case <-s.expireIndexTick.C:
logrus.Debugf("expire index posts by expireIndexTick")
if len(s.indexPosts) != 0 {
s.indexPosts = nil
s.atomicIndex.Store(s.indexPosts)
}
case action := <-s.indexActionCh:
switch action {
case core.IdxActCreatePost, core.IdxActUpdatePost, core.IdxActDeletePost, core.IdxActStickPost:
// prevent many update post in least time
if len(s.indexPosts) != 0 {
logrus.Debugf("remove index posts by action %s", action)
s.indexPosts = nil
s.atomicIndex.Store(s.indexPosts)
}
default:
// nop
}
}
}
}

@ -1,10 +1,14 @@
package dao
import (
"sync/atomic"
"time"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/minio/minio-go/v7"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/pkg/zinc"
"github.com/sirupsen/logrus"
"gorm.io/gorm"
@ -17,13 +21,27 @@ var (
_ core.ObjectStorageService = (*s3Servant)(nil)
_ core.ObjectStorageService = (*localossServant)(nil)
_ core.AttachmentCheckService = (*attachmentCheckServant)(nil)
_ core.CacheIndexService = (*simpleCacheIndexServant)(nil)
)
type dataServant struct {
useCacheIndex bool
cacheIndex core.CacheIndexService
engine *gorm.DB
zinc *zinc.ZincClient
}
type simpleCacheIndexServant struct {
getIndexPosts func(offset, limit int) ([]*model.PostFormated, error)
indexActionCh chan core.IndexActionT
indexPosts []*model.PostFormated
atomicIndex atomic.Value
maxIndexSize int
checkTick *time.Ticker
expireIndexTick *time.Ticker
}
type localossServant struct {
savePath string
domain string
@ -47,10 +65,20 @@ type attachmentCheckServant struct {
}
func NewDataService(engine *gorm.DB, zinc *zinc.ZincClient) core.DataService {
return &dataServant{
ds := &dataServant{
engine: engine,
zinc: zinc,
}
// initialize CacheIndex if needed
if !conf.CfgIf("CacheIndex") {
ds.useCacheIndex = false
} else {
ds.useCacheIndex = true
ds.cacheIndex = newSimpleCacheIndexServant(ds.getIndexPosts)
}
return ds
}
func NewObjectStorageService() (oss core.ObjectStorageService) {

@ -3,16 +3,26 @@ package dao
import (
"time"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
)
func (d *dataServant) CreatePost(post *model.Post) (*model.Post, error) {
post.LatestRepliedOn = time.Now().Unix()
return post.Create(d.engine)
p, err := post.Create(d.engine)
if err != nil {
return nil, err
}
d.indexActive(core.IdxActCreatePost)
return p, nil
}
func (d *dataServant) DeletePost(post *model.Post) error {
return post.Delete(d.engine)
if err := post.Delete(d.engine); err != nil {
return err
}
d.indexActive(core.IdxActDeletePost)
return nil
}
func (d *dataServant) LockPost(post *model.Post) error {
@ -22,7 +32,11 @@ func (d *dataServant) LockPost(post *model.Post) error {
func (d *dataServant) StickPost(post *model.Post) error {
post.IsTop = 1 - post.IsTop
return post.Update(d.engine)
if err := post.Update(d.engine); err != nil {
return err
}
d.indexActive(core.IdxActStickPost)
return nil
}
func (d *dataServant) GetPostByID(id int64) (*model.Post, error) {
@ -43,7 +57,11 @@ func (d *dataServant) GetPostCount(conditions *model.ConditionsT) (int64, error)
}
func (d *dataServant) UpdatePost(post *model.Post) error {
return post.Update(d.engine)
if err := post.Update(d.engine); err != nil {
return err
}
d.indexActive(core.IdxActUpdatePost)
return nil
}
func (d *dataServant) GetUserPostStar(postID, userID int64) (*model.PostStar, error) {

@ -0,0 +1,74 @@
package dao
import (
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/sirupsen/logrus"
)
func (d *dataServant) IndexPosts(offset int, limit int) ([]*model.PostFormated, error) {
if d.useCacheIndex {
if posts, ok := d.cacheIndex.IndexPosts(offset, limit); ok {
logrus.Debugln("get index posts from cached")
return posts, nil
}
}
logrus.Debugf("get index posts from database but useCacheIndex: %t", d.useCacheIndex)
return d.getIndexPosts(offset, limit)
}
func (d *dataServant) MergePosts(posts []*model.Post) ([]*model.PostFormated, error) {
postIds := make([]int64, 0, len(posts))
userIds := make([]int64, 0, len(posts))
for _, post := range posts {
postIds = append(postIds, post.ID)
userIds = append(userIds, post.UserID)
}
postContents, err := d.GetPostContentsByIDs(postIds)
if err != nil {
return nil, err
}
users, err := d.GetUsersByIDs(userIds)
if err != nil {
return nil, err
}
userMap := make(map[int64]*model.UserFormated, len(users))
for _, user := range users {
userMap[user.ID] = user.Format()
}
contentMap := make(map[int64][]*model.PostContentFormated, len(postContents))
for _, content := range postContents {
contentMap[content.PostID] = append(contentMap[content.PostID], content.Format())
}
// 数据整合
postsFormated := make([]*model.PostFormated, 0, len(posts))
for _, post := range posts {
postFormated := post.Format()
postFormated.User = userMap[post.UserID]
postFormated.Contents = contentMap[post.ID]
postsFormated = append(postsFormated, postFormated)
}
return postsFormated, nil
}
func (d *dataServant) getIndexPosts(offset int, limit int) ([]*model.PostFormated, error) {
posts, err := (&model.Post{}).List(d.engine, &model.ConditionsT{
"ORDER": "is_top DESC, latest_replied_on DESC",
}, offset, limit)
if err != nil {
logrus.Debugf("getIndexPosts err: %v", err)
return nil, err
}
return d.MergePosts(posts)
}
func (d *dataServant) indexActive(act core.IndexActionT) {
if d.useCacheIndex {
d.cacheIndex.SendAction(act)
}
}

@ -23,14 +23,8 @@ func GetPostList(c *gin.Context) {
}
if q.Query == "" && q.Type == "search" {
// 直接读库
posts, err := service.GetPostList(&service.PostListReq{
Conditions: &model.ConditionsT{
"ORDER": "is_top DESC, latest_replied_on DESC",
},
Offset: (app.GetPage(c) - 1) * app.GetPageSize(c),
Limit: app.GetPageSize(c),
})
offset, limit := app.GetPageOffset(c)
posts, err := service.GetIndexPosts(offset, limit)
if err != nil {
logrus.Errorf("service.GetPostList err: %v\n", err)
response.ToErrorResponse(errcode.GetPostsFailed)

@ -324,6 +324,10 @@ func GetPostContentByID(id int64) (*model.PostContent, error) {
return ds.GetPostContentByID(id)
}
func GetIndexPosts(offset int, limit int) ([]*model.PostFormated, error) {
return ds.IndexPosts(offset, limit)
}
func GetPostList(req *PostListReq) ([]*model.PostFormated, error) {
posts, err := ds.GetPosts(req.Conditions, req.Offset, req.Limit)

@ -27,11 +27,18 @@ func GetPageSize(c *gin.Context) int {
return pageSize
}
func GetPageOffset(page, pageSize int) int {
result := 0
if page > 0 {
result = (page - 1) * pageSize
func GetPageOffset(c *gin.Context) (offset, limit int) {
page := convert.StrTo(c.Query("page")).MustInt()
if page <= 0 {
page = 1
}
return result
limit = convert.StrTo(c.Query("page_size")).MustInt()
if limit <= 0 {
limit = conf.AppSetting.DefaultPageSize
} else if limit > conf.AppSetting.MaxPageSize {
limit = conf.AppSetting.MaxPageSize
}
offset = (page - 1) * limit
return
}

Loading…
Cancel
Save