optimize update/create/delete tweet logic

pull/391/head
Michael Li 1 year ago
parent f4f6148a27
commit 2a44f94f3e
No known key found for this signature in database

@ -68,14 +68,14 @@ func NewIndexActionA(act IdxAct, tweet *cs.TweetInfo) *IndexActionA {
// CacheIndexService cache index service interface
type CacheIndexService interface {
IndexPostsService
// IndexPostsService
SendAction(act IdxAct, post *dbr.Post)
}
// CacheIndexServantA cache index service interface
type CacheIndexServantA interface {
IndexPostsServantA
// IndexPostsServantA
SendAction(act IdxAct, tweet *cs.TweetInfo)
}

@ -16,13 +16,16 @@ type DataService interface {
TopicService
// 广场泡泡服务
IndexPostsService
// IndexPostsService
// 推文服务
TweetService
TweetManageService
TweetHelpService
// 推文指标服务
TweetMetricServantA
// 评论服务
CommentService
CommentManageService

@ -0,0 +1,25 @@
// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
// Package cs contain core data service interface type
// model define
package cs
type TweetMetric struct {
PostId int64
CommentCount int64
UpvoteCount int64
CollectionCount int64
ShareCount int64
ThumbdownCount int64
ThumbupCount int64
}
func (m *TweetMetric) RankScore(motivationFactor int) int64 {
if motivationFactor == 0 {
motivationFactor = 1
}
return (m.CommentCount + m.UpvoteCount*2 + m.CollectionCount*4 + m.ShareCount*8) * int64(motivationFactor)
}

@ -0,0 +1,15 @@
// Copyright 2022 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package core
import (
"github.com/rocboss/paopao-ce/internal/core/cs"
)
type TweetMetricServantA interface {
UpdateRankScore(metric *cs.TweetMetric) error
AddTweetMetric(postId int64) error
DeleteTweetMetric(postId int64) error
}

@ -0,0 +1,96 @@
// Copyright 2022 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package cache
import (
"fmt"
"github.com/alimy/tryst/event"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/core/ms"
"github.com/rocboss/paopao-ce/internal/events"
)
type expireIndexTweetsEvent struct {
event.UnimplementedEvent
tweet *ms.Post
ac core.AppCache
keysPattern []string
}
type expireHotsTweetsEvent struct {
event.UnimplementedEvent
tweet *ms.Post
ac core.AppCache
keyPattern string
}
type expireFollowTweetsEvent struct {
event.UnimplementedEvent
tweet *ms.Post
ac core.AppCache
keyPattern string
}
func onExpireIndexTweetEvent(tweet *ms.Post) {
events.OnEvent(&expireIndexTweetsEvent{
tweet: tweet,
ac: _appCache,
keysPattern: []string{
conf.PrefixIdxTweetsNewest + "*",
conf.PrefixIdxTweetsHots + "*",
fmt.Sprintf("%s%d:*", conf.PrefixUserTweets, tweet.UserID),
},
})
}
func onExpireHotsTweetEvent(tweet *ms.Post) {
events.OnEvent(&expireHotsTweetsEvent{
tweet: tweet,
ac: _appCache,
keyPattern: conf.PrefixHotsTweets + "*",
})
}
func onExpireFollowTweetEvent(tweet *ms.Post) {
events.OnEvent(&expireFollowTweetsEvent{
tweet: tweet,
ac: _appCache,
keyPattern: conf.PrefixFollowingTweets + "*",
})
}
func (e *expireIndexTweetsEvent) Name() string {
return "expireIndexTweetsEvent"
}
func (e *expireIndexTweetsEvent) Action() (err error) {
// logrus.Debug("expireIndexTweetsEvent action running")
for _, pattern := range e.keysPattern {
e.ac.DelAny(pattern)
}
return
}
func (e *expireHotsTweetsEvent) Name() string {
return "expireHotsTweetsEvent"
}
func (e *expireHotsTweetsEvent) Action() (err error) {
// logrus.Debug("expireHotsTweetsEvent action running")
e.ac.DelAny(e.keyPattern)
return
}
func (e *expireFollowTweetsEvent) Name() string {
return "expireFollowTweetsEvent"
}
func (e *expireFollowTweetsEvent) Action() (err error) {
// logrus.Debug("expireFollowTweetsEvent action running")
e.ac.DelAny(e.keyPattern)
return
}

@ -0,0 +1,49 @@
// Copyright 2022 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package cache
import (
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/core/cs"
"github.com/rocboss/paopao-ce/internal/core/ms"
"github.com/sirupsen/logrus"
)
type eventCacheIndexSrv struct {
tms core.TweetMetricServantA
}
func (s *eventCacheIndexSrv) SendAction(act core.IdxAct, post *ms.Post) {
err := error(nil)
switch act {
case core.IdxActUpdatePost:
err = s.tms.UpdateRankScore(&cs.TweetMetric{
PostId: post.ID,
CommentCount: post.CommentCount,
UpvoteCount: post.UpvoteCount,
CollectionCount: post.CollectionCount,
ShareCount: post.ShareCount,
})
onExpireIndexTweetEvent(post)
case core.IdxActCreatePost:
err = s.tms.AddTweetMetric(post.ID)
onExpireIndexTweetEvent(post)
case core.IdxActDeletePost:
err = s.tms.DeleteTweetMetric(post.ID)
onExpireIndexTweetEvent(post)
case core.IdxActStickPost, core.IdxActVisiblePost:
onExpireIndexTweetEvent(post)
}
if err != nil {
logrus.Errorf("eventCacheIndexSrv.SendAction(%s) occurs error: %s", act, err)
}
}
func NewEventCacheIndexSrv(tms core.TweetMetricServantA) core.CacheIndexService {
lazyInitial()
return &eventCacheIndexSrv{
tms: tms,
}
}

@ -0,0 +1,32 @@
// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package dbr
import (
"time"
"gorm.io/gorm"
)
type PostMetric struct {
*Model
PostId int64
RankScore int64
IncentiveScore int
DecayFactor int
MotivationFactor int
}
func (p *PostMetric) Create(db *gorm.DB) (*PostMetric, error) {
err := db.Create(&p).Error
return p, err
}
func (p *PostMetric) Delete(db *gorm.DB) error {
return db.Model(p).Where("post_id", p.PostId).Updates(map[string]any{
"deleted_on": time.Now().Unix(),
"is_del": 1,
}).Error
}

@ -12,12 +12,10 @@ import (
"sync"
"github.com/Masterminds/semver/v3"
"github.com/alimy/tryst/cfg"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/dao/cache"
"github.com/rocboss/paopao-ce/internal/dao/security"
"github.com/sirupsen/logrus"
)
var (
@ -31,13 +29,14 @@ var (
)
type dataSrv struct {
core.IndexPostsService
// core.IndexPostsService
core.WalletService
core.MessageService
core.TopicService
core.TweetService
core.TweetManageService
core.TweetHelpService
core.TweetMetricServantA
core.CommentService
core.CommentManageService
core.UserManageService
@ -57,37 +56,13 @@ type webDataSrvA struct {
func NewDataService() (core.DataService, core.VersionInfo) {
lazyInitial()
var (
v core.VersionInfo
cis core.CacheIndexService
)
db := conf.MustGormDB()
pvs := security.NewPhoneVerifyService()
ams := NewAuthorizationManageService()
ths := newTweetHelpService(db)
ips := newShipIndexService(db, ams, ths)
// initialize core.CacheIndexService
cfg.On(cfg.Actions{
"SimpleCacheIndex": func() {
// simpleCache use special post index service
ips = newSimpleIndexPostsService(db, ths)
cis, v = cache.NewSimpleCacheIndexService(ips)
},
"BigCacheIndex": func() {
cis, v = cache.NewBigCacheIndexService(ips, ams)
},
"RedisCacheIndex": func() {
cis, v = cache.NewRedisCacheIndexService(ips, ams)
},
}, func() {
// defualt no cache
cis, v = cache.NewNoneCacheIndexService(ips)
})
logrus.Infof("use %s as cache index service by version: %s", v.Name(), v.Version())
tms := NewTweetMetricServentA(db)
cis := cache.NewEventCacheIndexSrv(tms)
ds := &dataSrv{
IndexPostsService: cis,
TweetMetricServantA: tms,
WalletService: newWalletService(db),
MessageService: newMessageService(db),
TopicService: newTopicService(db),

@ -0,0 +1,45 @@
// Copyright 2022 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package jinzhu
import (
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/core/cs"
"github.com/rocboss/paopao-ce/internal/dao/jinzhu/dbr"
"gorm.io/gorm"
)
type tweetMetricSrvA struct {
db *gorm.DB
}
func (s *tweetMetricSrvA) UpdateRankScore(metric *cs.TweetMetric) error {
return s.db.Transaction(func(tx *gorm.DB) (err error) {
postMetric := &dbr.PostMetric{}
db := s.db.Model(postMetric).Where("post_id=?", metric.PostId)
if err = db.First(postMetric).Error; err != nil {
return
}
postMetric.RankScore = metric.RankScore(postMetric.MotivationFactor)
err = db.Save(postMetric).Error
return
})
}
func (s *tweetMetricSrvA) AddTweetMetric(postId int64) (err error) {
_, err = (&dbr.PostMetric{}).Create(s.db)
return
}
func (s *tweetMetricSrvA) DeleteTweetMetric(postId int64) (err error) {
return (&dbr.PostMetric{PostId: postId}).Delete(s.db)
}
func NewTweetMetricServentA(db *gorm.DB) core.TweetMetricServantA {
return &tweetMetricSrvA{
db: db,
}
}

@ -14,7 +14,6 @@ import (
"github.com/rocboss/paopao-ce/internal/events"
"github.com/rocboss/paopao-ce/internal/model/joint"
"github.com/rocboss/paopao-ce/internal/model/web"
"github.com/sirupsen/logrus"
)
type cacheUnreadMsgEvent struct {
@ -31,12 +30,6 @@ type createMessageEvent struct {
message *ms.Message
}
type createTweetEvent struct {
event.UnimplementedEvent
tweet *ms.Post
ac core.AppCache
}
func onCacheUnreadMsgEvent(uid int64) {
events.OnEvent(&cacheUnreadMsgEvent{
ds: _ds,
@ -53,13 +46,6 @@ func onCreateMessageEvent(data *ms.Message) {
})
}
func onCreateTweetEvent(tweet *ms.Post) {
events.OnEvent(&createTweetEvent{
ac: _ac,
tweet: tweet,
})
}
func (e *cacheUnreadMsgEvent) Name() string {
return "cacheUnreadMsgEvent"
}
@ -100,13 +86,3 @@ func (e *createMessageEvent) Action() (err error) {
}
return
}
func (e *createTweetEvent) Name() string {
return "createTweetEvent"
}
func (e *createTweetEvent) Action() (err error) {
// TODO: 过期缓存重新计算rank等
logrus.Debug("createTweetEvent post action running")
return
}

@ -175,10 +175,10 @@ func (s *looseSrv) GetUserTweets(req *web.GetUserTweetsReq) (res *web.GetUserTwe
func (s *looseSrv) userTweetsFromCache(req *web.GetUserTweetsReq, user *cs.VistUser) (res *web.GetUserTweetsResp, key string, ok bool) {
switch req.Style {
case web.UserPostsStylePost, web.UserPostsStyleHighlight, web.UserPostsStyleMedia:
key = fmt.Sprintf("%s%s:%s:%s:%d:%d", s.prefixUserTweets, req.Username, req.Style, user.RelTyp, req.Page, req.PageSize)
key = fmt.Sprintf("%s%d:%s:%s:%d:%d", s.prefixUserTweets, req.User.ID, req.Style, user.RelTyp, req.Page, req.PageSize)
default:
visitUserName := lets.If(user.RelTyp != cs.RelationGuest, user.Username, "_")
key = fmt.Sprintf("%s%s:%s:%s:%d:%d", s.prefixUserTweets, req.Username, req.Style, visitUserName, req.Page, req.PageSize)
key = fmt.Sprintf("%s%d:%s:%s:%d:%d", s.prefixUserTweets, req.User.ID, req.Style, visitUserName, req.Page, req.PageSize)
}
if data, err := s.ac.Get(key); err == nil {
ok, res = true, &web.GetUserTweetsResp{

@ -309,8 +309,6 @@ func (s *privSrv) CreateTweet(req *web.CreateTweetReq) (_ *web.CreateTweetResp,
logrus.Infof("Ds.RevampPosts err: %s", err)
return nil, web.ErrCreatePostFailed
}
// 发推后处理
onCreateTweetEvent(post)
return (*web.CreateTweetResp)(formatedPosts[0]), nil
}

Loading…
Cancel
Save