add full support for tweet hots comment logic and add cache support for tweet comment

pull/393/head
Michael Li 1 year ago
parent bb669a9632
commit ba879a2fec
No known key found for this signature in database

@ -94,6 +94,58 @@ All notable changes to paopao-ce are documented in this file.
ReadTimeout: 60
WriteTimeout: 60
...
- add full support for tweet hots comment logic and add cache support for tweet comments.
mirgration database first(sql ddl file in `scripts/migration/**/*_rank_metrics.up.sql`):
```sql
ALTER TABLE `p_comment` ADD COLUMN `reply_count` int unsigned NOT NULL DEFAULT 0 COMMENT '回复数';
UPDATE p_comment comment
SET reply_count = (
SELECT count(*) FROM p_comment_reply reply WHERE reply.comment_id=comment.id AND reply.is_del=0
)
WHERE is_del=0;
CREATE TABLE `p_comment_metric` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`comment_id` bigint unsigned NOT NULL,
`rank_score` bigint unsigned NOT NULL DEFAULT 0,
`incentive_score` int unsigned NOT NULL DEFAULT 0,
`decay_factor` int unsigned NOT NULL DEFAULT 0,
`motivation_factor` int unsigned NOT NULL DEFAULT 0,
`is_del` tinyint NOT NULL DEFAULT 0,
`created_on` bigint unsigned NOT NULL DEFAULT 0,
`modified_on` bigint unsigned NOT NULL DEFAULT 0,
`deleted_on` bigint unsigned NOT NULL DEFAULT 0,
PRIMARY KEY (`id`) USING BTREE,
KEY `idx_comment_metric_comment_id_rank_score` (`comment_id`, `rank_score`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
INSERT INTO p_comment_metric (comment_id, rank_score, created_on)
SELECT id AS comment_id,
reply_count*2 + thumbs_up_count*4 - thumbs_down_count AS rank_score,
created_on
FROM p_comment
WHERE is_del=0;
CREATE TABLE `p_user_metric` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`user_id` bigint unsigned NOT NULL,
`tweets_count` int unsigned NOT NULL DEFAULT 0,
`latest_trends_on` bigint unsigned NOT NULL DEFAULT 0 COMMENT '最新动态时间',
`is_del` tinyint NOT NULL DEFAULT 0,
`created_on` bigint unsigned NOT NULL DEFAULT 0,
`modified_on` bigint unsigned NOT NULL DEFAULT 0,
`deleted_on` bigint unsigned NOT NULL DEFAULT 0,
PRIMARY KEY (`id`) USING BTREE,
KEY `idx_user_metric_user_id_tweets_count_trends` (`user_id`, `tweets_count`, `latest_trends_on`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
INSERT INTO p_user_metric (user_id, tweets_count)
SELECT user_id, count(*) AS tweets_count
FROM p_post
WHERE is_del=0
GROUP BY user_id;
```
## 0.4.2
### Fixed

@ -62,7 +62,12 @@ func RegisterLooseServant(e *gin.Engine, s Loose) {
return
}
resp, err := s.TweetComments(req)
s.Render(c, resp, err)
if err != nil {
s.Render(c, nil, err)
return
}
var rv _render_ = resp
rv.Render(c)
})
router.Handle("GET", "/tags", func(c *gin.Context) {
select {

@ -17,6 +17,9 @@ const (
// 以下包含一些在cache中会用到的key的前缀
const (
InfixCommentDefault = "default"
InfixCommentHots = "hots"
InfixCommentNewest = "newest"
PrefixNewestTweets = "paopao:newesttweets:"
PrefixHotsTweets = "paopao:hotstweets:"
PrefixFollowingTweets = "paopao:followingtweets:"
@ -31,6 +34,7 @@ const (
PrefixUserInfoByName = "paopao:userinfo:name:"
PrefixMyFriendIds = "paopao:myfriendids:"
PrefixMyFollowIds = "paopao:myfollowids:"
PrefixTweetComment = "paopao:comment:"
KeySiteStatus = "paopao:sitestatus"
KeyHistoryMaxOnline = "history.max.online"
)

@ -11,6 +11,7 @@ Cache:
UnreadMsgExpire: 60 # 未读消息过期时间,单位秒, 默认60s
UserTweetsExpire: 60 # 获取用户推文列表过期时间,单位秒, 默认60s
IndexTweetsExpire: 120 # 获取广场推文列表过期时间,单位秒, 默认120s
TweetCommentsExpire: 180 # 获取推文评论过期时间,单位秒, 默认180s
OnlineUserExpire: 300 # 标记在线用户 过期时间,单位秒, 默认300s
UserInfoExpire: 300 # 获取用户信息过期时间,单位秒, 默认300s
UserRelationExpire: 600 # 用户关系信息过期时间,单位秒, 默认600s

@ -102,6 +102,7 @@ type cacheConf struct {
UnreadMsgExpire int64
UserTweetsExpire int64
IndexTweetsExpire int64
TweetCommentsExpire int64
OnlineUserExpire int64
UserInfoExpire int64
UserRelationExpire int64

@ -22,6 +22,7 @@ type DataService interface {
// 推文指标服务
TweetMetricServantA
CommentMetricServantA
// 评论服务
CommentService

@ -200,7 +200,6 @@ func (s *commentManageSrv) CreateCommentReply(reply *ms.CommentReply) (res *ms.C
if res, err = reply.Create(s.db); err == nil {
// 宽松处理错误
s.db.Table(_comment_).Where("id=?", reply.CommentID).Update("reply_count", gorm.Expr("reply_count+1"))
onUpdateCommentMetricEvent(reply.CommentID, s.db, s.cms)
}
return
}
@ -222,7 +221,6 @@ func (s *commentManageSrv) DeleteCommentReply(reply *ms.CommentReply) (err error
}
// 宽松处理错误
db.Table(_comment_).Where("id=?", reply.CommentID).Update("reply_count", gorm.Expr("reply_count-1"))
onUpdateCommentMetricEvent(reply.CommentID, s.db, s.cms)
db.Commit()
return
}
@ -277,9 +275,6 @@ func (s *commentManageSrv) ThumbsUpComment(userId int64, tweetId, commentId int6
return err
}
db.Commit()
if err == nil {
onUpdateCommentMetricEvent(commentId, s.db, s.cms)
}
return nil
}
@ -330,9 +325,6 @@ func (s *commentManageSrv) ThumbsDownComment(userId int64, tweetId, commentId in
return err
}
db.Commit()
if err == nil {
onUpdateCommentMetricEvent(commentId, s.db, s.cms)
}
return nil
}

@ -1,49 +0,0 @@
// Copyright 2022 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package jinzhu
import (
"github.com/alimy/tryst/event"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/core/cs"
"github.com/rocboss/paopao-ce/internal/dao/jinzhu/dbr"
"github.com/rocboss/paopao-ce/internal/events"
"github.com/sirupsen/logrus"
"gorm.io/gorm"
)
type updateCommentMetricEvent struct {
event.UnimplementedEvent
db *gorm.DB
cms core.CommentMetricServantA
commentId int64
}
func onUpdateCommentMetricEvent(commentId int64, db *gorm.DB, cms core.CommentMetricServantA) {
events.OnEvent(&updateCommentMetricEvent{
db: db,
cms: cms,
commentId: commentId,
})
}
func (e *updateCommentMetricEvent) Name() string {
return "updateCommentMetricEvent"
}
func (e *updateCommentMetricEvent) Action() (err error) {
logrus.Debugf("trigger updateCommentMetricEvent action commentId[%d]", e.commentId)
comment := dbr.Comment{}
if err = e.db.Table(_comment_).Where("id=?", e.commentId).First(&comment).Error; err != nil {
return
}
e.cms.UpdateCommentMetric(&cs.CommentMetric{
CommentId: e.commentId,
ReplyCount: comment.ReplyCount,
ThumbsUpCount: comment.ThumbsUpCount,
ThumbsDownCount: comment.ThumbsDownCount,
})
return
}

@ -63,6 +63,7 @@ func NewDataService() (core.DataService, core.VersionInfo) {
cis := cache.NewEventCacheIndexSrv(tms)
ds := &dataSrv{
TweetMetricServantA: tms,
CommentMetricServantA: cms,
WalletService: newWalletService(db),
MessageService: newMessageService(db),
TopicService: newTopicService(db),

@ -7,6 +7,7 @@ package web
import (
"github.com/alimy/mir/v4"
"github.com/gin-gonic/gin"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/core/cs"
"github.com/rocboss/paopao-ce/internal/core/ms"
@ -46,7 +47,9 @@ type TweetCommentsReq struct {
PageSize int `form:"-" binding:"-"`
}
type TweetCommentsResp base.PageResp
type TweetCommentsResp struct {
joint.CachePageResp
}
type TimelineReq struct {
BaseInfo `form:"-" binding:"-"`
@ -145,3 +148,17 @@ func (s CommentStyleType) ToInnerValue() (res cs.StyleCommentType) {
}
return
}
func (s CommentStyleType) String() (res string) {
switch s {
case "default":
res = conf.InfixCommentDefault
case "hots":
res = conf.InfixCommentHots
case "newest":
res = conf.InfixCommentNewest
default:
res = "_"
}
return
}

@ -9,13 +9,26 @@ import (
"fmt"
"github.com/alimy/tryst/event"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/core/cs"
"github.com/rocboss/paopao-ce/internal/core/ms"
"github.com/rocboss/paopao-ce/internal/events"
"github.com/rocboss/paopao-ce/internal/model/joint"
"github.com/rocboss/paopao-ce/internal/model/web"
)
const (
_commentActionCreate uint8 = iota
_commentActionDelete
_commentActionThumbsUp
_commentActionThumbsDown
_commentActionReplyCreate
_commentActionReplyDelete
_commentActionReplyThumbsUp
_commentActionReplyThumbsDown
)
type cacheUnreadMsgEvent struct {
event.UnimplementedEvent
ds core.DataService
@ -30,6 +43,25 @@ type createMessageEvent struct {
message *ms.Message
}
type commentActionEvent struct {
event.UnimplementedEvent
ds core.DataService
ac core.AppCache
tweetId int64
commentId int64
action uint8
}
func onCommentActionEvent(tweetId int64, commentId int64, action uint8) {
events.OnEvent(&commentActionEvent{
ds: _ds,
ac: _ac,
tweetId: tweetId,
commentId: commentId,
action: action,
})
}
func onCacheUnreadMsgEvent(uid int64) {
events.OnEvent(&cacheUnreadMsgEvent{
ds: _ds,
@ -86,3 +118,51 @@ func (e *createMessageEvent) Action() (err error) {
}
return
}
func (e *commentActionEvent) Name() string {
return "updateCommentMetricEvent"
}
func (e *commentActionEvent) Action() (err error) {
// logrus.Debugf("trigger commentActionEvent action commentId[%d]", e.commentId)
switch e.action {
case _commentActionCreate:
err = e.ds.AddCommentMetric(e.commentId)
e.expireAllStyleComments()
case _commentActionDelete:
err = e.ds.DeleteCommentMetric(e.commentId)
e.expireAllStyleComments()
case _commentActionReplyCreate, _commentActionReplyDelete:
err = e.updateCommentMetric()
e.expireAllStyleComments()
case _commentActionThumbsUp, _commentActionThumbsDown:
err = e.updateCommentMetric()
e.expireHotsComments()
default:
// nothing
}
return
}
func (e *commentActionEvent) expireHotsComments() {
e.ac.DelAny(fmt.Sprintf("%s%d:%s:*", conf.PrefixTweetComment, e.tweetId, conf.InfixCommentHots))
}
func (e *commentActionEvent) expireAllStyleComments() {
e.ac.DelAny(fmt.Sprintf("%s%d:*", conf.PrefixTweetComment, e.tweetId))
}
func (e *commentActionEvent) updateCommentMetric() error {
// logrus.Debug("trigger commentActionEvent action[updateCommentMetric]")
comment, err := e.ds.GetCommentByID(e.commentId)
if err != nil {
return err
}
e.ds.UpdateCommentMetric(&cs.CommentMetric{
CommentId: e.commentId,
ReplyCount: comment.ReplyCount,
ThumbsUpCount: comment.ThumbsUpCount,
ThumbsDownCount: comment.ThumbsDownCount,
})
return nil
}

@ -32,10 +32,12 @@ type looseSrv struct {
ac core.AppCache
userTweetsExpire int64
idxTweetsExpire int64
tweetCommentsExpire int64
prefixUserTweets string
prefixIdxTweetsNewest string
prefixIdxTweetsHots string
prefixIdxTweetsFollowing string
prefixTweetComment string
}
func (s *looseSrv) Chain() gin.HandlersChain {
@ -155,6 +157,18 @@ func (s *looseSrv) indexTweetsFromCache(req *web.TimelineReq, limit int, offset
return
}
func (s *looseSrv) tweetCommentsFromCache(req *web.TweetCommentsReq, limit int, offset int) (res *web.TweetCommentsResp, key string, ok bool) {
key = fmt.Sprintf("%s%d:%s:%d:%d", s.prefixTweetComment, req.TweetId, req.Style, limit, offset)
if data, err := s.ac.Get(key); err == nil {
ok, res = true, &web.TweetCommentsResp{
CachePageResp: joint.CachePageResp{
JsonResp: data,
},
}
}
return
}
func (s *looseSrv) GetUserTweets(req *web.GetUserTweetsReq) (res *web.GetUserTweetsResp, err mir.Error) {
user, xerr := s.RelationTypFrom(req.User, req.Username)
if xerr != nil {
@ -393,9 +407,18 @@ func (s *looseSrv) TopicList(req *web.TopicListReq) (*web.TopicListResp, mir.Err
}, nil
}
func (s *looseSrv) TweetComments(req *web.TweetCommentsReq) (*web.TweetCommentsResp, mir.Error) {
comments, totalRows, err := s.Ds.GetComments(req.TweetId, req.Style.ToInnerValue(), req.PageSize, (req.Page-1)*req.PageSize)
if err != nil {
func (s *looseSrv) TweetComments(req *web.TweetCommentsReq) (res *web.TweetCommentsResp, err mir.Error) {
limit, offset := req.PageSize, (req.Page-1)*req.PageSize
// 尝试直接从缓存中获取数据
key, ok := "", false
if res, key, ok = s.tweetCommentsFromCache(req, limit, offset); ok {
logrus.Debugf("looseSrv.TweetComments from cache key:%s", key)
return
}
comments, totalRows, xerr := s.Ds.GetComments(req.TweetId, req.Style.ToInnerValue(), limit, offset)
if xerr != nil {
logrus.Errorf("looseSrv.TweetComments occurs error[1]: %s", xerr)
return nil, web.ErrGetCommentsFailed
}
@ -406,25 +429,29 @@ func (s *looseSrv) TweetComments(req *web.TweetCommentsReq) (*web.TweetCommentsR
commentIDs = append(commentIDs, comment.ID)
}
users, err := s.Ds.GetUsersByIDs(userIDs)
if err != nil {
users, xerr := s.Ds.GetUsersByIDs(userIDs)
if xerr != nil {
logrus.Errorf("looseSrv.TweetComments occurs error[2]: %s", xerr)
return nil, web.ErrGetCommentsFailed
}
contents, err := s.Ds.GetCommentContentsByIDs(commentIDs)
if err != nil {
contents, xerr := s.Ds.GetCommentContentsByIDs(commentIDs)
if xerr != nil {
logrus.Errorf("looseSrv.TweetComments occurs error[3]: %s", xerr)
return nil, web.ErrGetCommentsFailed
}
replies, err := s.Ds.GetCommentRepliesByID(commentIDs)
if err != nil {
replies, xerr := s.Ds.GetCommentRepliesByID(commentIDs)
if xerr != nil {
logrus.Errorf("looseSrv.TweetComments occurs error[4]: %s", xerr)
return nil, web.ErrGetCommentsFailed
}
var commentThumbs, replyThumbs cs.CommentThumbsMap
if req.Uid > 0 {
commentThumbs, replyThumbs, err = s.Ds.GetCommentThumbsMap(req.Uid, req.TweetId)
if err != nil {
commentThumbs, replyThumbs, xerr = s.Ds.GetCommentThumbsMap(req.Uid, req.TweetId)
if xerr != nil {
logrus.Errorf("looseSrv.TweetComments occurs error[5]: %s", xerr)
return nil, web.ErrGetCommentsFailed
}
}
@ -464,8 +491,14 @@ func (s *looseSrv) TweetComments(req *web.TweetCommentsReq) (*web.TweetCommentsR
}
commentsFormated = append(commentsFormated, commentFormated)
}
resp := base.PageRespFrom(commentsFormated, req.Page, req.PageSize, totalRows)
return (*web.TweetCommentsResp)(resp), nil
resp := joint.PageRespFrom(commentsFormated, req.Page, req.PageSize, totalRows)
// 缓存处理
base.OnCacheRespEvent(s.ac, key, resp, s.tweetCommentsExpire)
return &web.TweetCommentsResp{
CachePageResp: joint.CachePageResp{
Data: resp,
},
}, nil
}
func (s *looseSrv) TweetDetail(req *web.TweetDetailReq) (*web.TweetDetailResp, mir.Error) {
@ -502,9 +535,11 @@ func newLooseSrv(s *base.DaoServant, ac core.AppCache) api.Loose {
ac: ac,
userTweetsExpire: cs.UserTweetsExpire,
idxTweetsExpire: cs.IndexTweetsExpire,
tweetCommentsExpire: cs.TweetCommentsExpire,
prefixUserTweets: conf.PrefixUserTweets,
prefixIdxTweetsNewest: conf.PrefixIdxTweetsNewest,
prefixIdxTweetsHots: conf.PrefixIdxTweetsHots,
prefixIdxTweetsFollowing: conf.PrefixIdxTweetsFollowing,
prefixTweetComment: conf.PrefixTweetComment,
}
}

@ -82,6 +82,8 @@ func (s *privSrv) ThumbsDownTweetComment(req *web.TweetCommentThumbsReq) mir.Err
logrus.Errorf("thumbs down tweet comment error: %s req:%v", err, req)
return web.ErrThumbsDownTweetComment
}
// 缓存处理
onCommentActionEvent(req.TweetId, req.CommentId, _commentActionThumbsDown)
return nil
}
@ -90,6 +92,8 @@ func (s *privSrv) ThumbsUpTweetComment(req *web.TweetCommentThumbsReq) mir.Error
logrus.Errorf("thumbs up tweet comment error: %s req:%v", err, req)
return web.ErrThumbsUpTweetComment
}
// 缓存处理
onCommentActionEvent(req.TweetId, req.CommentId, _commentActionThumbsUp)
return nil
}
@ -355,10 +359,14 @@ func (s *privSrv) DeleteCommentReply(req *web.DeleteCommentReplyReq) mir.Error {
logrus.Errorf("s.deletePostCommentReply err: %s", err)
return web.ErrDeleteCommentFailed
}
// 缓存处理, 宽松处理错误
if comment, err := s.Ds.GetCommentByID(reply.CommentID); err == nil {
onCommentActionEvent(comment.PostID, comment.ID, _commentActionReplyDelete)
}
return nil
}
func (s *privSrv) CreateCommentReply(req *web.CreateCommentReplyReq) (*web.CreateCommentReplyResp, mir.Error) {
func (s *privSrv) CreateCommentReply(req *web.CreateCommentReplyReq) (_ *web.CreateCommentReplyResp, xerr mir.Error) {
var (
post *ms.Post
comment *ms.Comment
@ -433,6 +441,8 @@ func (s *privSrv) CreateCommentReply(req *web.CreateCommentReplyReq) (*web.Creat
})
}
}
// 缓存处理
onCommentActionEvent(comment.PostID, comment.ID, _commentActionReplyCreate)
return (*web.CreateCommentReplyResp)(reply), nil
}
@ -461,6 +471,7 @@ func (s *privSrv) DeleteComment(req *web.DeleteCommentReq) mir.Error {
logrus.Errorf("Ds.DeleteComment err: %s", err)
return web.ErrDeleteCommentFailed
}
onCommentActionEvent(comment.PostID, comment.ID, _commentActionDelete)
return nil
}
@ -565,7 +576,8 @@ func (s *privSrv) CreateComment(req *web.CreateCommentReq) (_ *web.CreateComment
CommentID: comment.ID,
})
}
// 缓存处理
onCommentActionEvent(comment.PostID, comment.ID, _commentActionCreate)
return (*web.CreateCommentResp)(comment), nil
}

Loading…
Cancel
Save