From 2a44f94f3e21945a4ee8d554dbad5bb79eddaf9a Mon Sep 17 00:00:00 2001 From: Michael Li Date: Mon, 18 Sep 2023 12:57:16 +0800 Subject: [PATCH] optimize update/create/delete tweet logic --- internal/core/cache.go | 4 +- internal/core/core.go | 5 +- internal/core/cs/metrics.go | 25 ++++++++ internal/core/metrics.go | 15 +++++ internal/dao/cache/events.go | 96 ++++++++++++++++++++++++++++++ internal/dao/cache/tweets.go | 49 +++++++++++++++ internal/dao/jinzhu/dbr/metrics.go | 32 ++++++++++ internal/dao/jinzhu/jinzhu.go | 35 ++--------- internal/dao/jinzhu/metrics.go | 45 ++++++++++++++ internal/servants/web/events.go | 24 -------- internal/servants/web/loose.go | 4 +- internal/servants/web/priv.go | 2 - 12 files changed, 275 insertions(+), 61 deletions(-) create mode 100644 internal/core/cs/metrics.go create mode 100644 internal/core/metrics.go create mode 100644 internal/dao/cache/events.go create mode 100644 internal/dao/cache/tweets.go create mode 100644 internal/dao/jinzhu/dbr/metrics.go create mode 100644 internal/dao/jinzhu/metrics.go diff --git a/internal/core/cache.go b/internal/core/cache.go index 3fdcf3fb..bff7ca5c 100644 --- a/internal/core/cache.go +++ b/internal/core/cache.go @@ -68,14 +68,14 @@ func NewIndexActionA(act IdxAct, tweet *cs.TweetInfo) *IndexActionA { // CacheIndexService cache index service interface type CacheIndexService interface { - IndexPostsService + // IndexPostsService SendAction(act IdxAct, post *dbr.Post) } // CacheIndexServantA cache index service interface type CacheIndexServantA interface { - IndexPostsServantA + // IndexPostsServantA SendAction(act IdxAct, tweet *cs.TweetInfo) } diff --git a/internal/core/core.go b/internal/core/core.go index 810d1e97..cb56e9d6 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -16,13 +16,16 @@ type DataService interface { TopicService // 广场泡泡服务 - IndexPostsService + // IndexPostsService // 推文服务 TweetService TweetManageService TweetHelpService + // 推文指标服务 + TweetMetricServantA + // 评论服务 CommentService CommentManageService diff --git a/internal/core/cs/metrics.go b/internal/core/cs/metrics.go new file mode 100644 index 00000000..c8347dfa --- /dev/null +++ b/internal/core/cs/metrics.go @@ -0,0 +1,25 @@ +// Copyright 2023 ROC. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +// Package cs contain core data service interface type +// model define + +package cs + +type TweetMetric struct { + PostId int64 + CommentCount int64 + UpvoteCount int64 + CollectionCount int64 + ShareCount int64 + ThumbdownCount int64 + ThumbupCount int64 +} + +func (m *TweetMetric) RankScore(motivationFactor int) int64 { + if motivationFactor == 0 { + motivationFactor = 1 + } + return (m.CommentCount + m.UpvoteCount*2 + m.CollectionCount*4 + m.ShareCount*8) * int64(motivationFactor) +} diff --git a/internal/core/metrics.go b/internal/core/metrics.go new file mode 100644 index 00000000..790cee74 --- /dev/null +++ b/internal/core/metrics.go @@ -0,0 +1,15 @@ +// Copyright 2022 ROC. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package core + +import ( + "github.com/rocboss/paopao-ce/internal/core/cs" +) + +type TweetMetricServantA interface { + UpdateRankScore(metric *cs.TweetMetric) error + AddTweetMetric(postId int64) error + DeleteTweetMetric(postId int64) error +} diff --git a/internal/dao/cache/events.go b/internal/dao/cache/events.go new file mode 100644 index 00000000..4f137200 --- /dev/null +++ b/internal/dao/cache/events.go @@ -0,0 +1,96 @@ +// Copyright 2022 ROC. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package cache + +import ( + "fmt" + + "github.com/alimy/tryst/event" + "github.com/rocboss/paopao-ce/internal/conf" + "github.com/rocboss/paopao-ce/internal/core" + "github.com/rocboss/paopao-ce/internal/core/ms" + "github.com/rocboss/paopao-ce/internal/events" +) + +type expireIndexTweetsEvent struct { + event.UnimplementedEvent + tweet *ms.Post + ac core.AppCache + keysPattern []string +} + +type expireHotsTweetsEvent struct { + event.UnimplementedEvent + tweet *ms.Post + ac core.AppCache + keyPattern string +} + +type expireFollowTweetsEvent struct { + event.UnimplementedEvent + tweet *ms.Post + ac core.AppCache + keyPattern string +} + +func onExpireIndexTweetEvent(tweet *ms.Post) { + events.OnEvent(&expireIndexTweetsEvent{ + tweet: tweet, + ac: _appCache, + keysPattern: []string{ + conf.PrefixIdxTweetsNewest + "*", + conf.PrefixIdxTweetsHots + "*", + fmt.Sprintf("%s%d:*", conf.PrefixUserTweets, tweet.UserID), + }, + }) +} + +func onExpireHotsTweetEvent(tweet *ms.Post) { + events.OnEvent(&expireHotsTweetsEvent{ + tweet: tweet, + ac: _appCache, + keyPattern: conf.PrefixHotsTweets + "*", + }) +} + +func onExpireFollowTweetEvent(tweet *ms.Post) { + events.OnEvent(&expireFollowTweetsEvent{ + tweet: tweet, + ac: _appCache, + keyPattern: conf.PrefixFollowingTweets + "*", + }) +} + +func (e *expireIndexTweetsEvent) Name() string { + return "expireIndexTweetsEvent" +} + +func (e *expireIndexTweetsEvent) Action() (err error) { + // logrus.Debug("expireIndexTweetsEvent action running") + for _, pattern := range e.keysPattern { + e.ac.DelAny(pattern) + } + return +} + +func (e *expireHotsTweetsEvent) Name() string { + return "expireHotsTweetsEvent" +} + +func (e *expireHotsTweetsEvent) Action() (err error) { + // logrus.Debug("expireHotsTweetsEvent action running") + e.ac.DelAny(e.keyPattern) + return +} + +func (e *expireFollowTweetsEvent) Name() string { + return "expireFollowTweetsEvent" +} + +func (e *expireFollowTweetsEvent) Action() (err error) { + // logrus.Debug("expireFollowTweetsEvent action running") + e.ac.DelAny(e.keyPattern) + return +} diff --git a/internal/dao/cache/tweets.go b/internal/dao/cache/tweets.go new file mode 100644 index 00000000..ba89021d --- /dev/null +++ b/internal/dao/cache/tweets.go @@ -0,0 +1,49 @@ +// Copyright 2022 ROC. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package cache + +import ( + "github.com/rocboss/paopao-ce/internal/core" + "github.com/rocboss/paopao-ce/internal/core/cs" + "github.com/rocboss/paopao-ce/internal/core/ms" + "github.com/sirupsen/logrus" +) + +type eventCacheIndexSrv struct { + tms core.TweetMetricServantA +} + +func (s *eventCacheIndexSrv) SendAction(act core.IdxAct, post *ms.Post) { + err := error(nil) + switch act { + case core.IdxActUpdatePost: + err = s.tms.UpdateRankScore(&cs.TweetMetric{ + PostId: post.ID, + CommentCount: post.CommentCount, + UpvoteCount: post.UpvoteCount, + CollectionCount: post.CollectionCount, + ShareCount: post.ShareCount, + }) + onExpireIndexTweetEvent(post) + case core.IdxActCreatePost: + err = s.tms.AddTweetMetric(post.ID) + onExpireIndexTweetEvent(post) + case core.IdxActDeletePost: + err = s.tms.DeleteTweetMetric(post.ID) + onExpireIndexTweetEvent(post) + case core.IdxActStickPost, core.IdxActVisiblePost: + onExpireIndexTweetEvent(post) + } + if err != nil { + logrus.Errorf("eventCacheIndexSrv.SendAction(%s) occurs error: %s", act, err) + } +} + +func NewEventCacheIndexSrv(tms core.TweetMetricServantA) core.CacheIndexService { + lazyInitial() + return &eventCacheIndexSrv{ + tms: tms, + } +} diff --git a/internal/dao/jinzhu/dbr/metrics.go b/internal/dao/jinzhu/dbr/metrics.go new file mode 100644 index 00000000..82e105eb --- /dev/null +++ b/internal/dao/jinzhu/dbr/metrics.go @@ -0,0 +1,32 @@ +// Copyright 2023 ROC. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package dbr + +import ( + "time" + + "gorm.io/gorm" +) + +type PostMetric struct { + *Model + PostId int64 + RankScore int64 + IncentiveScore int + DecayFactor int + MotivationFactor int +} + +func (p *PostMetric) Create(db *gorm.DB) (*PostMetric, error) { + err := db.Create(&p).Error + return p, err +} + +func (p *PostMetric) Delete(db *gorm.DB) error { + return db.Model(p).Where("post_id", p.PostId).Updates(map[string]any{ + "deleted_on": time.Now().Unix(), + "is_del": 1, + }).Error +} diff --git a/internal/dao/jinzhu/jinzhu.go b/internal/dao/jinzhu/jinzhu.go index 5d4f8333..0645bd5b 100644 --- a/internal/dao/jinzhu/jinzhu.go +++ b/internal/dao/jinzhu/jinzhu.go @@ -12,12 +12,10 @@ import ( "sync" "github.com/Masterminds/semver/v3" - "github.com/alimy/tryst/cfg" "github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/dao/cache" "github.com/rocboss/paopao-ce/internal/dao/security" - "github.com/sirupsen/logrus" ) var ( @@ -31,13 +29,14 @@ var ( ) type dataSrv struct { - core.IndexPostsService + // core.IndexPostsService core.WalletService core.MessageService core.TopicService core.TweetService core.TweetManageService core.TweetHelpService + core.TweetMetricServantA core.CommentService core.CommentManageService core.UserManageService @@ -57,37 +56,13 @@ type webDataSrvA struct { func NewDataService() (core.DataService, core.VersionInfo) { lazyInitial() - var ( - v core.VersionInfo - cis core.CacheIndexService - ) db := conf.MustGormDB() pvs := security.NewPhoneVerifyService() - ams := NewAuthorizationManageService() - ths := newTweetHelpService(db) - ips := newShipIndexService(db, ams, ths) - - // initialize core.CacheIndexService - cfg.On(cfg.Actions{ - "SimpleCacheIndex": func() { - // simpleCache use special post index service - ips = newSimpleIndexPostsService(db, ths) - cis, v = cache.NewSimpleCacheIndexService(ips) - }, - "BigCacheIndex": func() { - cis, v = cache.NewBigCacheIndexService(ips, ams) - }, - "RedisCacheIndex": func() { - cis, v = cache.NewRedisCacheIndexService(ips, ams) - }, - }, func() { - // defualt no cache - cis, v = cache.NewNoneCacheIndexService(ips) - }) - logrus.Infof("use %s as cache index service by version: %s", v.Name(), v.Version()) + tms := NewTweetMetricServentA(db) + cis := cache.NewEventCacheIndexSrv(tms) ds := &dataSrv{ - IndexPostsService: cis, + TweetMetricServantA: tms, WalletService: newWalletService(db), MessageService: newMessageService(db), TopicService: newTopicService(db), diff --git a/internal/dao/jinzhu/metrics.go b/internal/dao/jinzhu/metrics.go new file mode 100644 index 00000000..129439e9 --- /dev/null +++ b/internal/dao/jinzhu/metrics.go @@ -0,0 +1,45 @@ +// Copyright 2022 ROC. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package jinzhu + +import ( + "github.com/rocboss/paopao-ce/internal/core" + "github.com/rocboss/paopao-ce/internal/core/cs" + "github.com/rocboss/paopao-ce/internal/dao/jinzhu/dbr" + "gorm.io/gorm" +) + +type tweetMetricSrvA struct { + db *gorm.DB +} + +func (s *tweetMetricSrvA) UpdateRankScore(metric *cs.TweetMetric) error { + return s.db.Transaction(func(tx *gorm.DB) (err error) { + postMetric := &dbr.PostMetric{} + db := s.db.Model(postMetric).Where("post_id=?", metric.PostId) + if err = db.First(postMetric).Error; err != nil { + return + } + postMetric.RankScore = metric.RankScore(postMetric.MotivationFactor) + err = db.Save(postMetric).Error + return + }) + +} + +func (s *tweetMetricSrvA) AddTweetMetric(postId int64) (err error) { + _, err = (&dbr.PostMetric{}).Create(s.db) + return +} + +func (s *tweetMetricSrvA) DeleteTweetMetric(postId int64) (err error) { + return (&dbr.PostMetric{PostId: postId}).Delete(s.db) +} + +func NewTweetMetricServentA(db *gorm.DB) core.TweetMetricServantA { + return &tweetMetricSrvA{ + db: db, + } +} diff --git a/internal/servants/web/events.go b/internal/servants/web/events.go index fb50e739..f3879d5e 100644 --- a/internal/servants/web/events.go +++ b/internal/servants/web/events.go @@ -14,7 +14,6 @@ import ( "github.com/rocboss/paopao-ce/internal/events" "github.com/rocboss/paopao-ce/internal/model/joint" "github.com/rocboss/paopao-ce/internal/model/web" - "github.com/sirupsen/logrus" ) type cacheUnreadMsgEvent struct { @@ -31,12 +30,6 @@ type createMessageEvent struct { message *ms.Message } -type createTweetEvent struct { - event.UnimplementedEvent - tweet *ms.Post - ac core.AppCache -} - func onCacheUnreadMsgEvent(uid int64) { events.OnEvent(&cacheUnreadMsgEvent{ ds: _ds, @@ -53,13 +46,6 @@ func onCreateMessageEvent(data *ms.Message) { }) } -func onCreateTweetEvent(tweet *ms.Post) { - events.OnEvent(&createTweetEvent{ - ac: _ac, - tweet: tweet, - }) -} - func (e *cacheUnreadMsgEvent) Name() string { return "cacheUnreadMsgEvent" } @@ -100,13 +86,3 @@ func (e *createMessageEvent) Action() (err error) { } return } - -func (e *createTweetEvent) Name() string { - return "createTweetEvent" -} - -func (e *createTweetEvent) Action() (err error) { - // TODO: 过期缓存,重新计算rank等 - logrus.Debug("createTweetEvent post action running") - return -} diff --git a/internal/servants/web/loose.go b/internal/servants/web/loose.go index df74f50c..2ef89d10 100644 --- a/internal/servants/web/loose.go +++ b/internal/servants/web/loose.go @@ -175,10 +175,10 @@ func (s *looseSrv) GetUserTweets(req *web.GetUserTweetsReq) (res *web.GetUserTwe func (s *looseSrv) userTweetsFromCache(req *web.GetUserTweetsReq, user *cs.VistUser) (res *web.GetUserTweetsResp, key string, ok bool) { switch req.Style { case web.UserPostsStylePost, web.UserPostsStyleHighlight, web.UserPostsStyleMedia: - key = fmt.Sprintf("%s%s:%s:%s:%d:%d", s.prefixUserTweets, req.Username, req.Style, user.RelTyp, req.Page, req.PageSize) + key = fmt.Sprintf("%s%d:%s:%s:%d:%d", s.prefixUserTweets, req.User.ID, req.Style, user.RelTyp, req.Page, req.PageSize) default: visitUserName := lets.If(user.RelTyp != cs.RelationGuest, user.Username, "_") - key = fmt.Sprintf("%s%s:%s:%s:%d:%d", s.prefixUserTweets, req.Username, req.Style, visitUserName, req.Page, req.PageSize) + key = fmt.Sprintf("%s%d:%s:%s:%d:%d", s.prefixUserTweets, req.User.ID, req.Style, visitUserName, req.Page, req.PageSize) } if data, err := s.ac.Get(key); err == nil { ok, res = true, &web.GetUserTweetsResp{ diff --git a/internal/servants/web/priv.go b/internal/servants/web/priv.go index 72e3db55..15083388 100644 --- a/internal/servants/web/priv.go +++ b/internal/servants/web/priv.go @@ -309,8 +309,6 @@ func (s *privSrv) CreateTweet(req *web.CreateTweetReq) (_ *web.CreateTweetResp, logrus.Infof("Ds.RevampPosts err: %s", err) return nil, web.ErrCreatePostFailed } - // 发推后处理 - onCreateTweetEvent(post) return (*web.CreateTweetResp)(formatedPosts[0]), nil }