optimize add cache first number page of post in custom configure

pull/78/head
alimy 3 years ago
parent b4d56f37cb
commit abfee0621f

@ -12,9 +12,11 @@ Server: # 服务设置
ReadTimeout: 60
WriteTimeout: 60
Features:
Default: ["Alipay", "Zinc", "MySQL", "Redis", "MinIO", "LoggerFile"]
Develop: ["Sms", "Zinc", "MySQL", "AliOSS", "LoggerZinc"]
Slim: ["Zinc", "MySQL", "Redis", "LocalOSS", "LoggerFile"]
Default: ["Base", "Option", "MinIO", "LoggerFile"]
Develop: ["Base", "Option", "Sms", "AliOSS", "LoggerZinc"]
Slim: ["Base", "LocalOSS", "LoggerFile"]
Base: ["Zinc", "MySQL", "Redis", "Alipay",]
Option: ["CacheIndex"]
Sms: "SmsJuhe"
SmsJuhe:
Key:
@ -23,15 +25,21 @@ SmsJuhe:
Alipay:
AppID:
PrivateKey:
CacheIndex: # 缓存泡泡广场消息流
MaxIndexSize: 200 # 最大缓存条数
CheckTickDuration: 60 # 循环自检查每多少秒一次
ExpireTickDuration: 300 # 每多少秒后强制过期缓存
LoggerFile: # 使用File写日志
SavePath: data/paopao-ce/logs
FileName: app
FileExt: .log
Level: info
LoggerZinc: # 使用Zinc写日志
Host: http://zinc:4080/es/_bulk
Index: paopao-log
User: admin
Password: admin
Level: info
JWT: # 鉴权加密
Secret: 18a6413dc4fe394c66345ebe501b2f26
Issuer: paopao-api

@ -13,17 +13,18 @@ var (
redisSetting *RedisSettingS
features *FeaturesSettingS
ServerSetting *ServerSettingS
AppSetting *AppSettingS
SmsJuheSetting *SmsJuheSettings
AlipaySetting *AlipaySettingS
ZincSetting *ZincSettingS
AliOSSSetting *AliOSSSettingS
MinIOSetting *MinIOSettingS
S3Setting *S3SettingS
LocalOSSSetting *LocalOSSSettingS
JWTSetting *JWTSettingS
Mutex *sync.Mutex
ServerSetting *ServerSettingS
AppSetting *AppSettingS
CacheIndexSetting *CacheIndexSettingS
SmsJuheSetting *SmsJuheSettings
AlipaySetting *AlipaySettingS
ZincSetting *ZincSettingS
AliOSSSetting *AliOSSSettingS
MinIOSetting *MinIOSettingS
S3Setting *S3SettingS
LocalOSSSetting *LocalOSSSettingS
JWTSetting *JWTSettingS
Mutex *sync.Mutex
)
func setupSetting(suite []string, noDefault bool) error {
@ -42,6 +43,7 @@ func setupSetting(suite []string, noDefault bool) error {
objects := map[string]interface{}{
"App": &AppSetting,
"Server": &ServerSetting,
"CacheIndex": &CacheIndexSetting,
"Alipay": &AlipaySetting,
"SmsJuhe": &SmsJuheSetting,
"LoggerFile": &loggerFileSetting,

@ -74,6 +74,9 @@ func setupLogger() {
LocalTime: true,
}
logrus.SetOutput(out)
if level, err := logrus.ParseLevel(loggerFileSetting.Level); err == nil {
logrus.SetLevel(level)
}
} else if CfgIf("LoggerZinc") {
hook := &zincLogHook{
host: loggerZincSetting.Host,
@ -81,6 +84,9 @@ func setupLogger() {
user: loggerZincSetting.User,
password: loggerZincSetting.Password,
}
if level, err := logrus.ParseLevel(loggerZincSetting.Level); err == nil {
logrus.SetLevel(level)
}
logrus.SetOutput(io.Discard)
logrus.AddHook(hook)
}

@ -16,6 +16,7 @@ type LoggerFileSettingS struct {
SavePath string
FileName string
FileExt string
Level string
}
type LoggerZincSettingS struct {
@ -23,6 +24,7 @@ type LoggerZincSettingS struct {
Index string
User string
Password string
Level string
}
type ServerSettingS struct {
@ -44,6 +46,12 @@ type AppSettingS struct {
TronApiKeys []string
}
type CacheIndexSettingS struct {
MaxIndexSize int
CheckTickDuration int
ExpireTickDuration int
}
type AlipaySettingS struct {
AppID string
PrivateKey string
@ -197,7 +205,6 @@ func (f *FeaturesSettingS) Use(suite []string, noDefault bool) error {
}
features := f.flatFeatures(suite)
for _, feature := range features {
feature = strings.ToLower(feature)
if len(feature) == 0 {
continue
}
@ -209,10 +216,11 @@ func (f *FeaturesSettingS) Use(suite []string, noDefault bool) error {
func (f *FeaturesSettingS) flatFeatures(suite []string) []string {
features := make([]string, 0, len(suite)+10)
for s := suite[:]; len(s) > 0; s = s[:len(s)-1] {
if items, exist := f.suites[s[0]]; exist {
item := strings.ToLower(s[0])
if items, exist := f.suites[item]; exist {
s = append(s, items...)
}
features = append(features, s[0])
features = append(features, item)
s[0] = s[len(s)-1]
}
return features

@ -34,6 +34,7 @@ type DataService interface {
LockPost(post *model.Post) error
StickPost(post *model.Post) error
GetPostByID(id int64) (*model.Post, error)
IndexPosts(offset int, limit int) ([]*model.PostFormated, error)
GetPosts(conditions *model.ConditionsT, offset, limit int) ([]*model.Post, error)
GetPostCount(conditions *model.ConditionsT) (int64, error)
UpdatePost(post *model.Post) error

@ -1,10 +1,14 @@
package dao
import (
"sync/atomic"
"time"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/minio/minio-go/v7"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/pkg/zinc"
"github.com/sirupsen/logrus"
"gorm.io/gorm"
@ -22,6 +26,14 @@ var (
type dataServant struct {
engine *gorm.DB
zinc *zinc.ZincClient
useCacheIndex bool
indexActionCh chan indexActionT
indexPosts []*model.PostFormated
atomicIndex atomic.Value
maxIndexSize int
checkTick *time.Ticker
expireIndexTick *time.Ticker
}
type localossServant struct {
@ -47,10 +59,31 @@ type attachmentCheckServant struct {
}
func NewDataService(engine *gorm.DB, zinc *zinc.ZincClient) core.DataService {
return &dataServant{
engine: engine,
zinc: zinc,
if !conf.CfgIf("CacheIndex") {
return &dataServant{
engine: engine,
zinc: zinc,
useCacheIndex: false,
}
}
s := conf.CacheIndexSetting
ds := &dataServant{
engine: engine,
zinc: zinc,
useCacheIndex: true,
maxIndexSize: conf.CacheIndexSetting.MaxIndexSize,
indexPosts: make([]*model.PostFormated, 0),
indexActionCh: make(chan indexActionT, 100), // optimize: size need configure by custom
checkTick: time.NewTicker(time.Duration(s.CheckTickDuration) * time.Second), // check whether need update index every 1 minute
expireIndexTick: time.NewTicker(time.Duration(s.ExpireTickDuration) * time.Second), // force expire index every 5 minute
}
// start index posts
ds.atomicIndex.Store(ds.indexPosts)
go ds.startIndexPosts()
return ds
}
func NewObjectStorageService() (oss core.ObjectStorageService) {

@ -8,11 +8,20 @@ import (
func (d *dataServant) CreatePost(post *model.Post) (*model.Post, error) {
post.LatestRepliedOn = time.Now().Unix()
return post.Create(d.engine)
p, err := post.Create(d.engine)
if err != nil {
return nil, err
}
d.indexActive(idxActCreatePost)
return p, nil
}
func (d *dataServant) DeletePost(post *model.Post) error {
return post.Delete(d.engine)
if err := post.Delete(d.engine); err != nil {
return err
}
d.indexActive(idxActDeletePost)
return nil
}
func (d *dataServant) LockPost(post *model.Post) error {
@ -22,7 +31,11 @@ func (d *dataServant) LockPost(post *model.Post) error {
func (d *dataServant) StickPost(post *model.Post) error {
post.IsTop = 1 - post.IsTop
return post.Update(d.engine)
if err := post.Update(d.engine); err != nil {
return err
}
d.indexActive(idxActStickPost)
return nil
}
func (d *dataServant) GetPostByID(id int64) (*model.Post, error) {
@ -43,7 +56,11 @@ func (d *dataServant) GetPostCount(conditions *model.ConditionsT) (int64, error)
}
func (d *dataServant) UpdatePost(post *model.Post) error {
return post.Update(d.engine)
if err := post.Update(d.engine); err != nil {
return err
}
d.indexActive(idxActUpdatePost)
return nil
}
func (d *dataServant) GetUserPostStar(postID, userID int64) (*model.PostStar, error) {

@ -0,0 +1,28 @@
package dao
const (
idxActNop indexActionT = iota + 1
idxActCreatePost
idxActUpdatePost
idxActDeletePost
idxActStickPost
)
type indexActionT uint8
func (a indexActionT) String() string {
switch a {
case idxActNop:
return "no operator"
case idxActCreatePost:
return "create post"
case idxActUpdatePost:
return "update post"
case idxActDeletePost:
return "delete post"
case idxActStickPost:
return "stick post"
default:
return "unknow action"
}
}

@ -0,0 +1,112 @@
package dao
import (
"github.com/rocboss/paopao-ce/internal/model"
"github.com/sirupsen/logrus"
)
func (d *dataServant) IndexPosts(offset int, limit int) ([]*model.PostFormated, error) {
if d.useCacheIndex {
posts := d.atomicIndex.Load().([]*model.PostFormated)
start := offset * limit
end := start + limit
if len(posts) >= end {
logrus.Debugln("get index posts from cached")
return posts[start:end], nil
}
}
logrus.Debugf("get index posts from database but useCacheIndex: %t", d.useCacheIndex)
return d.getIndexPosts(offset, limit)
}
func (d *dataServant) MergePosts(posts []*model.Post) ([]*model.PostFormated, error) {
postIds := make([]int64, 0, len(posts))
userIds := make([]int64, 0, len(posts))
for _, post := range posts {
postIds = append(postIds, post.ID)
userIds = append(userIds, post.UserID)
}
postContents, err := d.GetPostContentsByIDs(postIds)
if err != nil {
return nil, err
}
users, err := d.GetUsersByIDs(userIds)
if err != nil {
return nil, err
}
userMap := make(map[int64]*model.UserFormated, len(users))
for _, user := range users {
userMap[user.ID] = user.Format()
}
contentMap := make(map[int64][]*model.PostContentFormated, len(postContents))
for _, content := range postContents {
contentMap[content.PostID] = append(contentMap[content.PostID], content.Format())
}
// 数据整合
postsFormated := make([]*model.PostFormated, 0, len(posts))
for _, post := range posts {
postFormated := post.Format()
postFormated.User = userMap[post.UserID]
postFormated.Contents = contentMap[post.ID]
postsFormated = append(postsFormated, postFormated)
}
return postsFormated, nil
}
func (d *dataServant) getIndexPosts(offset int, limit int) ([]*model.PostFormated, error) {
posts, err := (&model.Post{}).List(d.engine, &model.ConditionsT{
"ORDER": "is_top DESC, latest_replied_on DESC",
}, offset, limit)
if err != nil {
logrus.Debugf("getIndexPosts err: %v", err)
return nil, err
}
return d.MergePosts(posts)
}
func (d *dataServant) indexActive(act indexActionT) {
select {
case d.indexActionCh <- act:
logrus.Debugf("send indexAction by chan: %s", act)
default:
go func(ch chan<- indexActionT, act indexActionT) {
logrus.Debugf("send indexAction by goroutine: %s", act)
ch <- act
}(d.indexActionCh, act)
}
}
func (d *dataServant) startIndexPosts() {
var err error
for {
select {
case <-d.checkTick.C:
if len(d.indexPosts) == 0 {
logrus.Debugf("index posts by checkTick")
if d.indexPosts, err = d.getIndexPosts(0, d.maxIndexSize); err == nil {
d.atomicIndex.Store(d.indexPosts)
} else {
logrus.Errorf("get index posts err: %v", err)
}
}
case <-d.expireIndexTick.C:
logrus.Debugf("expire index posts by expireIndexTick")
d.indexPosts = nil
d.atomicIndex.Store(d.indexPosts)
case action := <-d.indexActionCh:
switch action {
case idxActCreatePost, idxActUpdatePost, idxActDeletePost, idxActStickPost:
logrus.Debugf("remove index posts by action %s", action)
d.indexPosts = nil
d.atomicIndex.Store(d.indexPosts)
default:
// nop
}
}
}
}

@ -24,13 +24,15 @@ func GetPostList(c *gin.Context) {
if q.Query == "" && q.Type == "search" {
// 直接读库
posts, err := service.GetPostList(&service.PostListReq{
Conditions: &model.ConditionsT{
"ORDER": "is_top DESC, latest_replied_on DESC",
},
Offset: (app.GetPage(c) - 1) * app.GetPageSize(c),
Limit: app.GetPageSize(c),
})
// posts, err := service.GetPostList(&service.PostListReq{
// Conditions: &model.ConditionsT{
// "ORDER": "is_top DESC, latest_replied_on DESC",
// },
// Offset: (app.GetPage(c) - 1) * app.GetPageSize(c),
// Limit: app.GetPageSize(c),
// })
offset, limit := app.GetPageOffset(c)
posts, err := service.GetIndexPosts(offset, limit)
if err != nil {
logrus.Errorf("service.GetPostList err: %v\n", err)
response.ToErrorResponse(errcode.GetPostsFailed)

@ -324,6 +324,10 @@ func GetPostContentByID(id int64) (*model.PostContent, error) {
return ds.GetPostContentByID(id)
}
func GetIndexPosts(offset int, limit int) ([]*model.PostFormated, error) {
return ds.IndexPosts(offset, limit)
}
func GetPostList(req *PostListReq) ([]*model.PostFormated, error) {
posts, err := ds.GetPosts(req.Conditions, req.Offset, req.Limit)

@ -27,11 +27,18 @@ func GetPageSize(c *gin.Context) int {
return pageSize
}
func GetPageOffset(page, pageSize int) int {
result := 0
if page > 0 {
result = (page - 1) * pageSize
func GetPageOffset(c *gin.Context) (offset, limit int) {
page := convert.StrTo(c.Query("page")).MustInt()
if page <= 0 {
page = 1
}
return result
limit = convert.StrTo(c.Query("page_size")).MustInt()
if limit <= 0 {
limit = conf.AppSetting.DefaultPageSize
} else if limit > conf.AppSetting.MaxPageSize {
limit = conf.AppSetting.MaxPageSize
}
offset = (page - 1) * limit
return
}

Loading…
Cancel
Save