From c922361ccf9dd0d0d1a0b6de9b5a85644c9dc145 Mon Sep 17 00:00:00 2001 From: Michael Li Date: Wed, 13 Sep 2023 06:19:19 +0800 Subject: [PATCH 1/5] add simple audit hook base logic --- README.md | 2 ++ auto/api/v1/priv.go | 34 +++++++++++++++++++++++---- features-status.md | 12 +++++++++- internal/events/events.go | 2 +- internal/model/web/audit.go | 39 +++++++++++++++++++++++++++++++ internal/model/web/priv.go | 15 ++++++++++++ internal/servants/chain/audit.go | 25 ++++++++++++++++++++ internal/servants/chain/events.go | 35 +++++++++++++++++++++++++++ internal/servants/web/priv.go | 25 ++++++++++++++------ internal/servants/web/web.go | 2 +- mirc/web/v1/priv.go | 2 +- 11 files changed, 178 insertions(+), 15 deletions(-) create mode 100644 internal/model/web/audit.go create mode 100644 internal/servants/chain/audit.go create mode 100644 internal/servants/chain/events.go diff --git a/README.md b/README.md index f1ebb53b..e8f98f80 100644 --- a/README.md +++ b/README.md @@ -383,6 +383,8 @@ release/paopao serve --no-default-features --features sqlite3,localoss,loggerfil |[`Pyroscope`](docs/proposal/23021510-关于使用pyroscope用于性能调试的设计.md)| 性能优化 | 内测 | 开启Pyroscope功能用于性能调试 | |[`Pprof`](docs/proposal/23062905-添加Pprof功能特性用于获取Profile.md)| 性能优化 | 内测 | 开启Pprof功能收集Profile信息 | |`PhoneBind` | 其他 | 稳定 | 手机绑定功能 | +|`UseAuditHook` | 其他 | 内测 | 使用审核hook功能 | +|`UseJobManager` | 其他 | 内测 | 使用JobManager功能 | |`Web:DisallowUserRegister` | 功能特性 | 稳定 | 不允许用户注册 | > 功能项状态详情参考 [features-status](features-status.md). diff --git a/auto/api/v1/priv.go b/auto/api/v1/priv.go index f4adccfc..c6142180 100644 --- a/auto/api/v1/priv.go +++ b/auto/api/v1/priv.go @@ -44,8 +44,20 @@ type Priv interface { mustEmbedUnimplementedPrivServant() } +type PrivChain interface { + ChainCreateTweet() gin.HandlersChain + + mustEmbedUnimplementedPrivChain() +} + // RegisterPrivServant register Priv servant to gin -func RegisterPrivServant(e *gin.Engine, s Priv) { +func RegisterPrivServant(e *gin.Engine, s Priv, m ...PrivChain) { + var cc PrivChain + if len(m) > 0 { + cc = m[0] + } else { + cc = &UnimplementedPrivChain{} + } router := e.Group("v1") // use chain for router middlewares := s.Chain() @@ -297,7 +309,7 @@ func RegisterPrivServant(e *gin.Engine, s Priv) { } s.Render(c, nil, s.DeleteTweet(req)) }) - router.Handle("POST", "/post", func(c *gin.Context) { + router.Handle("POST", "/post", append(cc.ChainCreateTweet(), func(c *gin.Context) { select { case <-c.Request.Context().Done(): return @@ -310,8 +322,13 @@ func RegisterPrivServant(e *gin.Engine, s Priv) { return } resp, err := s.CreateTweet(req) - s.Render(c, resp, err) - }) + if err != nil { + s.Render(c, nil, err) + return + } + var rv _render_ = resp + rv.Render(c) + })...) router.Handle("GET", "/attachment", func(c *gin.Context) { select { case <-c.Request.Context().Done(): @@ -455,3 +472,12 @@ func (UnimplementedPrivServant) UploadAttachment(req *web.UploadAttachmentReq) ( } func (UnimplementedPrivServant) mustEmbedUnimplementedPrivServant() {} + +// UnimplementedPrivChain can be embedded to have forward compatible implementations. +type UnimplementedPrivChain struct{} + +func (b *UnimplementedPrivChain) ChainCreateTweet() gin.HandlersChain { + return nil +} + +func (b *UnimplementedPrivChain) mustEmbedUnimplementedPrivChain() {} diff --git a/features-status.md b/features-status.md index b8b0dafc..4949fdc8 100644 --- a/features-status.md +++ b/features-status.md @@ -196,7 +196,17 @@ * `PhoneBind` 手机绑定功能; * [ ] 提按文档 * [x] 接口定义 - * [x] 业务逻辑实现 + * [x] 业务逻辑实现 + +* `UseAuditHook` 使用审核hook功能 (目前状态: 内测 待完善后将转为Builtin) + * [ ] 提按文档 + * [x] 接口定义 + * [x] 业务逻辑实现 + +* `UseJobManager` 使用JobManager功能 (目前状态: 内测 待完善后将转为Builtin) + * [ ] 提按文档 + * [x] 接口定义 + * [x] 业务逻辑实现 ### 功能特性: * `Web:DisallowUserRegister` 不允许用户注册; diff --git a/internal/events/events.go b/internal/events/events.go index 01b9c15c..cccc145d 100644 --- a/internal/events/events.go +++ b/internal/events/events.go @@ -18,7 +18,7 @@ var ( func Initial() { _onceInitial.Do(func() { initEventManager() - if cfg.If("JobManager") { + if cfg.If("UseJobManager") { initJobManager() logrus.Debugln("initial JobManager") } diff --git a/internal/model/web/audit.go b/internal/model/web/audit.go new file mode 100644 index 00000000..5b891177 --- /dev/null +++ b/internal/model/web/audit.go @@ -0,0 +1,39 @@ +// Copyright 2023 ROC. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package web + +const ( + AuditStyleUnknown AuditStyle = iota + AuditStyleUserTweet + AuditStyleUserTweetComment + AuditStyleUserTweetReply +) + +const ( + AuditHookCtxKey = "audit_ctx_key" +) + +type AuditStyle uint8 + +type AuditMetaInfo struct { + Style AuditStyle + Id int64 +} + +func (s AuditStyle) String() (res string) { + switch s { + case AuditStyleUserTweet: + res = "UserTweet" + case AuditStyleUserTweetComment: + res = "UserTweetComment" + case AuditStyleUserTweetReply: + res = "UserTweetReply" + case AuditStyleUnknown: + fallthrough + default: + res = "Unknown" + } + return +} diff --git a/internal/model/web/priv.go b/internal/model/web/priv.go index 7ce1aba0..604311b1 100644 --- a/internal/model/web/priv.go +++ b/internal/model/web/priv.go @@ -7,12 +7,14 @@ package web import ( "fmt" "mime/multipart" + "net/http" "strings" "github.com/alimy/mir/v4" "github.com/gin-gonic/gin" "github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/core/ms" + "github.com/rocboss/paopao-ce/internal/model/joint" "github.com/rocboss/paopao-ce/internal/servants/base" "github.com/rocboss/paopao-ce/pkg/convert" "github.com/rocboss/paopao-ce/pkg/xerror" @@ -281,3 +283,16 @@ func (r *CreateCommentReq) Bind(c *gin.Context) mir.Error { r.ClientIP = c.ClientIP() return bindAny(c, r) } + +func (r *CreateTweetResp) Render(c *gin.Context) { + c.JSON(http.StatusOK, &joint.JsonResp{ + Code: 0, + Msg: "success", + Data: r, + }) + // 设置审核元信息,用于接下来的审核逻辑 + c.Set(AuditHookCtxKey, &AuditMetaInfo{ + Style: AuditStyleUserTweet, + Id: r.ID, + }) +} diff --git a/internal/servants/chain/audit.go b/internal/servants/chain/audit.go new file mode 100644 index 00000000..01976777 --- /dev/null +++ b/internal/servants/chain/audit.go @@ -0,0 +1,25 @@ +// Copyright 2023 ROC. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package chain + +import ( + "github.com/gin-gonic/gin" + "github.com/rocboss/paopao-ce/internal/model/web" +) + +func AuditHook() gin.HandlerFunc { + return func(c *gin.Context) { + // 此midleware后面是真正的http handlder,让handler先执行 + c.Next() + // 审核hook 后处理逻辑 + var ami *web.AuditMetaInfo + if val, ok := c.Get(web.AuditHookCtxKey); ok { + if ami, ok = val.(*web.AuditMetaInfo); !ok { + return + } + } + OnAudiotHookEvent(ami) + } +} diff --git a/internal/servants/chain/events.go b/internal/servants/chain/events.go new file mode 100644 index 00000000..d1d73777 --- /dev/null +++ b/internal/servants/chain/events.go @@ -0,0 +1,35 @@ +// Copyright 2023 ROC. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package chain + +import ( + "github.com/alimy/tryst/event" + "github.com/rocboss/paopao-ce/internal/events" + "github.com/rocboss/paopao-ce/internal/model/web" + "github.com/sirupsen/logrus" +) + +type AuditHookEvent struct { + event.UnimplementedEvent + ami *web.AuditMetaInfo +} + +func (e *AuditHookEvent) Name() string { + return "AuditHookEvent" +} + +func (e *AuditHookEvent) Action() error { + // TODO: just log event now, will add real logic in future. + logrus.Debugf("auditHook event action style[%s] id[%d]", e.ami.Style, e.ami.Id) + return nil +} + +func OnAudiotHookEvent(ami *web.AuditMetaInfo) { + if ami != nil { + events.OnEvent(&AuditHookEvent{ + ami: ami, + }) + } +} diff --git a/internal/servants/web/priv.go b/internal/servants/web/priv.go index 99d20f8b..329bbe64 100644 --- a/internal/servants/web/priv.go +++ b/internal/servants/web/priv.go @@ -10,6 +10,7 @@ import ( "time" "github.com/alimy/mir/v4" + "github.com/alimy/tryst/cfg" "github.com/disintegration/imaging" "github.com/gin-gonic/gin" "github.com/gofrs/uuid/v5" @@ -27,7 +28,8 @@ import ( ) var ( - _ api.Priv = (*privSrv)(nil) + _ api.Priv = (*privSrv)(nil) + _ api.PrivChain = (*privChain)(nil) _uploadAttachmentTypeMap = map[string]ms.AttachmentType{ "public/image": ms.AttachmentTypeImage, @@ -35,12 +37,6 @@ var ( "public/video": ms.AttachmentTypeVideo, "attachment": ms.AttachmentTypeOther, } - _uploadAttachmentTypes = map[string]cs.AttachmentType{ - "public/image": cs.AttachmentTypeImage, - "public/avatar": cs.AttachmentTypeImage, - "public/video": cs.AttachmentTypeVideo, - "attachment": cs.AttachmentTypeOther, - } ) type privSrv struct { @@ -50,6 +46,17 @@ type privSrv struct { oss core.ObjectStorageService } +type privChain struct { + api.UnimplementedPrivChain +} + +func (s *privChain) ChainCreateTweet() (res gin.HandlersChain) { + if cfg.If("UseAuditHook") { + res = gin.HandlersChain{chain.AuditHook()} + } + return +} + func (s *privSrv) Chain() gin.HandlersChain { return gin.HandlersChain{chain.JWT(), chain.Priv()} } @@ -846,3 +853,7 @@ func newPrivSrv(s *base.DaoServant, oss core.ObjectStorageService) api.Priv { oss: oss, } } + +func newPrivChain() api.PrivChain { + return &privChain{} +} diff --git a/internal/servants/web/web.go b/internal/servants/web/web.go index 14ae8bb1..e281310a 100644 --- a/internal/servants/web/web.go +++ b/internal/servants/web/web.go @@ -36,7 +36,7 @@ func RouteWeb(e *gin.Engine) { api.RegisterCoreServant(e, newCoreSrv(ds, _oss, _wc)) api.RegisterRelaxServant(e, newRelaxSrv(ds, _wc)) api.RegisterLooseServant(e, newLooseSrv(ds, _ac)) - api.RegisterPrivServant(e, newPrivSrv(ds, _oss)) + api.RegisterPrivServant(e, newPrivSrv(ds, _oss), newPrivChain()) api.RegisterPubServant(e, newPubSrv(ds)) api.RegisterFollowshipServant(e, newFollowshipSrv(ds)) api.RegisterFriendshipServant(e, newFriendshipSrv(ds)) diff --git a/mirc/web/v1/priv.go b/mirc/web/v1/priv.go index 567de00d..758afd31 100644 --- a/mirc/web/v1/priv.go +++ b/mirc/web/v1/priv.go @@ -25,7 +25,7 @@ type Priv struct { DownloadAttachment func(Get, web.DownloadAttachmentReq) web.DownloadAttachmentResp `mir:"/attachment"` // CreateTweet 发布动态 - CreateTweet func(Post, web.CreateTweetReq) web.CreateTweetResp `mir:"/post"` + CreateTweet func(Post, Chain, web.CreateTweetReq) web.CreateTweetResp `mir:"/post"` // DeleteTweet 删除动态 DeleteTweet func(Delete, web.DeleteTweetReq) `mir:"/post"` From 887d09b10bd0777f0816ef0b7c62f5679a4964f0 Mon Sep 17 00:00:00 2001 From: Michael Li Date: Wed, 13 Sep 2023 06:26:37 +0800 Subject: [PATCH 2/5] increase default min worker to 64 in EventManager --- internal/conf/config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/conf/config.yaml b/internal/conf/config.yaml index 20fe34ad..0593fa73 100644 --- a/internal/conf/config.yaml +++ b/internal/conf/config.yaml @@ -11,7 +11,7 @@ Cache: UnreadMsgExpire: 60 # 未读消息过期时间,单位秒, 默认60s UserTweetsExpire: 60 # 获取用户推文列表过期时间,单位秒, 默认60s EventManager: # 事件管理器的配置参数 - MinWorker: 10 # 最小后台工作者, 设置范围[5, ++], 默认10 + MinWorker: 64 # 最小后台工作者, 设置范围[5, ++], 默认64 MaxEventBuf: 100 # 最大log缓存条数, 设置范围[10, ++], 默认100 MaxTempEventBuf: 100 # 最大log缓存条数, 设置范围[10, ++], 默认100 MaxTickCount: 60 # 最大的循环周期, 设置范围[60, ++], 默认60 From 059cdcddaf56d7455935cf10913b813cda8f7c1c Mon Sep 17 00:00:00 2001 From: Michael Li Date: Wed, 13 Sep 2023 06:29:14 +0800 Subject: [PATCH 3/5] update conf/config.yaml --- internal/conf/config.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/conf/config.yaml b/internal/conf/config.yaml index 0593fa73..fbcdd317 100644 --- a/internal/conf/config.yaml +++ b/internal/conf/config.yaml @@ -12,8 +12,8 @@ Cache: UserTweetsExpire: 60 # 获取用户推文列表过期时间,单位秒, 默认60s EventManager: # 事件管理器的配置参数 MinWorker: 64 # 最小后台工作者, 设置范围[5, ++], 默认64 - MaxEventBuf: 100 # 最大log缓存条数, 设置范围[10, ++], 默认100 - MaxTempEventBuf: 100 # 最大log缓存条数, 设置范围[10, ++], 默认100 + MaxEventBuf: 128 # 最大log缓存条数, 设置范围[10, ++], 默认128 + MaxTempEventBuf: 256 # 最大log缓存条数, 设置范围[10, ++], 默认256 MaxTickCount: 60 # 最大的循环周期, 设置范围[60, ++], 默认60 TickWaitTime: 1 # 一个周期的等待时间,单位:秒 默认1s Features: From 2889228fef04f98daf1695ac65749dff095694e8 Mon Sep 17 00:00:00 2001 From: Michael Li Date: Wed, 13 Sep 2023 12:07:39 +0800 Subject: [PATCH 4/5] upgrade github.com/alimy/tryst => v0.8.3 --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index b96e0d9b..5d21c9a5 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/Masterminds/semver/v3 v3.2.1 github.com/afocus/captcha v0.0.0-20191010092841-4bd1f21c8868 github.com/alimy/mir/v4 v4.0.0 - github.com/alimy/tryst v0.8.2 + github.com/alimy/tryst v0.8.3 github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible github.com/allegro/bigcache/v3 v3.1.0 github.com/bufbuild/connect-go v1.10.0 diff --git a/go.sum b/go.sum index 4f9b4638..028be948 100644 --- a/go.sum +++ b/go.sum @@ -125,8 +125,8 @@ github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:C github.com/alexflint/go-filemutex v1.1.0/go.mod h1:7P4iRhttt/nUvUOrYIhcpMzv2G6CY9UnI16Z+UJqRyk= github.com/alimy/mir/v4 v4.0.0 h1:MzGfmoLjjvR69jbZEmpKJO3tUuqB0RGRv1UWPbtukBg= github.com/alimy/mir/v4 v4.0.0/go.mod h1:d58dBvw2KImcVbAUANrciEV/of0arMNsI9c/5UNCMMc= -github.com/alimy/tryst v0.8.2 h1:azu5B58vS6m/ZeHovYGWjVvEOJN2llDIBLxuN3qtMtk= -github.com/alimy/tryst v0.8.2/go.mod h1:ua2eJbFrisHPh7z93Bgc0jNBE8Khu1SCx2p/6t3OzZI= +github.com/alimy/tryst v0.8.3 h1:k54a9YesCGUTqfyDp9NL55TI8CxIj8HNJZyzbIoNab8= +github.com/alimy/tryst v0.8.3/go.mod h1:ua2eJbFrisHPh7z93Bgc0jNBE8Khu1SCx2p/6t3OzZI= github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible h1:Sg/2xHwDrioHpxTN6WMiwbXTpUEinBpHsN7mG21Rc2k= github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= github.com/allegro/bigcache/v3 v3.1.0 h1:H2Vp8VOvxcrB91o86fUSVJFqeuz8kpyyB02eH3bSzwk= From 771a942b67d537d8f92f522dfd09fdac0c1f99d9 Mon Sep 17 00:00:00 2001 From: Michael Li Date: Thu, 14 Sep 2023 12:15:07 +0800 Subject: [PATCH 5/5] add base metrics component to measure some server status and add measure online user logic --- auto/api/v1/relax.go | 27 +++++- internal/conf/cache.go | 36 +++++++- internal/conf/conf.go | 3 + internal/conf/config.yaml | 7 ++ internal/conf/setting.go | 9 ++ internal/core/cache.go | 1 + internal/dao/cache/web.go | 21 +++-- internal/events/events.go | 88 ++++++++++++++++++- internal/events/events_tryst.go | 40 +++++++++ internal/events/jobs.go | 46 +--------- internal/internal.go | 3 + .../{events/pool.go => metrics/metrics.go} | 53 +++++++---- internal/metrics/metrics_tryst.go | 32 +++++++ internal/model/web/audit.go | 3 +- internal/servants/base/events.go | 4 +- internal/servants/chain/chain.go | 3 + internal/servants/chain/measure.go | 21 +++++ internal/servants/chain/metrics.go | 36 ++++++++ internal/servants/web/relax.go | 12 +++ internal/servants/web/web.go | 2 +- mirc/web/v1/relax.go | 2 +- pkg/types/types.go | 4 + 22 files changed, 373 insertions(+), 80 deletions(-) create mode 100644 internal/events/events_tryst.go rename internal/{events/pool.go => metrics/metrics.go} (60%) create mode 100644 internal/metrics/metrics_tryst.go create mode 100644 internal/servants/chain/measure.go create mode 100644 internal/servants/chain/metrics.go diff --git a/auto/api/v1/relax.go b/auto/api/v1/relax.go index e88fc8c7..2cd41803 100644 --- a/auto/api/v1/relax.go +++ b/auto/api/v1/relax.go @@ -23,15 +23,27 @@ type Relax interface { mustEmbedUnimplementedRelaxServant() } +type RelaxChain interface { + ChainGetUnreadMsgCount() gin.HandlersChain + + mustEmbedUnimplementedRelaxChain() +} + // RegisterRelaxServant register Relax servant to gin -func RegisterRelaxServant(e *gin.Engine, s Relax) { +func RegisterRelaxServant(e *gin.Engine, s Relax, m ...RelaxChain) { + var cc RelaxChain + if len(m) > 0 { + cc = m[0] + } else { + cc = &UnimplementedRelaxChain{} + } router := e.Group("v1") // use chain for router middlewares := s.Chain() router.Use(middlewares...) // register routes info to router - router.Handle("GET", "/user/msgcount/unread", func(c *gin.Context) { + router.Handle("GET", "/user/msgcount/unread", append(cc.ChainGetUnreadMsgCount(), func(c *gin.Context) { select { case <-c.Request.Context().Done(): return @@ -49,7 +61,7 @@ func RegisterRelaxServant(e *gin.Engine, s Relax) { } var rv _render_ = resp rv.Render(c) - }) + })...) } // UnimplementedRelaxServant can be embedded to have forward compatible implementations. @@ -64,3 +76,12 @@ func (UnimplementedRelaxServant) GetUnreadMsgCount(req *web.GetUnreadMsgCountReq } func (UnimplementedRelaxServant) mustEmbedUnimplementedRelaxServant() {} + +// UnimplementedRelaxChain can be embedded to have forward compatible implementations. +type UnimplementedRelaxChain struct{} + +func (b *UnimplementedRelaxChain) ChainGetUnreadMsgCount() gin.HandlersChain { + return nil +} + +func (b *UnimplementedRelaxChain) mustEmbedUnimplementedRelaxChain() {} diff --git a/internal/conf/cache.go b/internal/conf/cache.go index 8f69a01c..8e3a2a28 100644 --- a/internal/conf/cache.go +++ b/internal/conf/cache.go @@ -8,6 +8,7 @@ import ( "fmt" "github.com/alimy/tryst/cache" + "github.com/rocboss/paopao-ce/pkg/types" ) const ( @@ -16,12 +17,21 @@ const ( // 以下包含一些在cache中会用到的key的前缀 const ( - PrefixUserTweets = "paopao:usertweets:" + PrefixNewestTweets = "paopao:newesttweets:" + PrefixHotsTweets = "paopao:hotstweets:" + PrefixFollowingTweets = "paopao:followingtweets:" + PrefixUserTweets = "paopao:usertweets:" + PrefixUnreadmsg = "paopao:unreadmsg:" + PrefixOnlineUser = "paopao:onlineuser:" ) // 以下包含一些在cache中会用到的池化后的key var ( - KeyUnreadMsg cache.KeyPool[int64] + KeyNewestTweets cache.KeyPool[int] + KeyHotsTweets cache.KeyPool[int] + KeyFollowingTweets cache.KeyPool[string] + KeyUnreadMsg cache.KeyPool[int64] + KeyOnlineUser cache.KeyPool[int64] ) func initCacheKeyPool() { @@ -29,7 +39,25 @@ func initCacheKeyPool() { if poolSize < CacheSetting.KeyPoolSize { poolSize = CacheSetting.KeyPoolSize } - KeyUnreadMsg = cache.MustKeyPool[int64](poolSize, func(key int64) string { - return fmt.Sprintf("paopao:unreadmsg:%d", key) + KeyNewestTweets = intKeyPool[int](poolSize, PrefixNewestTweets) + KeyHotsTweets = intKeyPool[int](poolSize, PrefixHotsTweets) + KeyFollowingTweets = strKeyPool(poolSize, PrefixFollowingTweets) + KeyUnreadMsg = intKeyPool[int64](poolSize, PrefixUnreadmsg) + KeyOnlineUser = intKeyPool[int64](poolSize, PrefixOnlineUser) +} + +func strKeyPool(size int, prefix string) cache.KeyPool[string] { + return cache.MustKeyPool(size, func(key string) string { + return fmt.Sprintf("%s%s", prefix, key) }) } + +func intKeyPool[T types.Integer](size int, prefix string) cache.KeyPool[T] { + return cache.MustKeyPool[T](size, intKey[T](prefix)) +} + +func intKey[T types.Integer](prefix string) func(T) string { + return func(key T) string { + return fmt.Sprintf("%s%d", prefix, key) + } +} diff --git a/internal/conf/conf.go b/internal/conf/conf.go index bc4be72f..7880db81 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -37,6 +37,7 @@ var ( AppSetting *appConf CacheSetting *cacheConf EventManagerSetting *eventManagerConf + MetricManagerSetting *metricManagerConf CacheIndexSetting *cacheIndexConf SimpleCacheIndexSetting *simpleCacheIndexConf BigCacheIndexSetting *bigCacheIndexConf @@ -73,6 +74,7 @@ func setupSetting(suite []string, noDefault bool) error { "App": &AppSetting, "Cache": &CacheSetting, "EventManager": &EventManagerSetting, + "MetricManager": &MetricManagerSetting, "PprofServer": &PprofServerSetting, "WebServer": &WebServerSetting, "AdminServer": &AdminServerSetting, @@ -121,6 +123,7 @@ func setupSetting(suite []string, noDefault bool) error { CacheSetting.CientSideCacheExpire *= time.Second EventManagerSetting.TickWaitTime *= time.Second + MetricManagerSetting.TickWaitTime *= time.Second JWTSetting.Expire *= time.Second SimpleCacheIndexSetting.CheckTickDuration *= time.Second SimpleCacheIndexSetting.ExpireTickDuration *= time.Second diff --git a/internal/conf/config.yaml b/internal/conf/config.yaml index fbcdd317..e076c42c 100644 --- a/internal/conf/config.yaml +++ b/internal/conf/config.yaml @@ -10,12 +10,19 @@ Cache: CientSideCacheExpire: 60 # 客户端缓存过期时间 默认60s UnreadMsgExpire: 60 # 未读消息过期时间,单位秒, 默认60s UserTweetsExpire: 60 # 获取用户推文列表过期时间,单位秒, 默认60s + OnlineUserExpire: 300 # 标记在线用户 过期时间,单位秒, 默认300s EventManager: # 事件管理器的配置参数 MinWorker: 64 # 最小后台工作者, 设置范围[5, ++], 默认64 MaxEventBuf: 128 # 最大log缓存条数, 设置范围[10, ++], 默认128 MaxTempEventBuf: 256 # 最大log缓存条数, 设置范围[10, ++], 默认256 MaxTickCount: 60 # 最大的循环周期, 设置范围[60, ++], 默认60 TickWaitTime: 1 # 一个周期的等待时间,单位:秒 默认1s +MetricManager: # 指标监控管理器的配置参数 + MinWorker: 32 # 最小后台工作者, 设置范围[5, ++], 默认32 + MaxEventBuf: 128 # 最大log缓存条数, 设置范围[10, ++], 默认128 + MaxTempEventBuf: 256 # 最大log缓存条数, 设置范围[10, ++], 默认256 + MaxTickCount: 60 # 最大的循环周期, 设置范围[60, ++], 默认60 + TickWaitTime: 1 # 一个周期的等待时间,单位:秒 默认1s Features: Default: [] WebServer: # Web服务 diff --git a/internal/conf/setting.go b/internal/conf/setting.go index 0f64f446..30802602 100644 --- a/internal/conf/setting.go +++ b/internal/conf/setting.go @@ -101,6 +101,7 @@ type cacheConf struct { CientSideCacheExpire time.Duration UnreadMsgExpire int64 UserTweetsExpire int64 + OnlineUserExpire int64 } type eventManagerConf struct { @@ -111,6 +112,14 @@ type eventManagerConf struct { TickWaitTime time.Duration } +type metricManagerConf struct { + MinWorker int + MaxEventBuf int + MaxTempEventBuf int + MaxTickCount int + TickWaitTime time.Duration +} + type cacheIndexConf struct { MaxUpdateQPS int MinWorker int diff --git a/internal/core/cache.go b/internal/core/cache.go index 335a62ae..05822527 100644 --- a/internal/core/cache.go +++ b/internal/core/cache.go @@ -101,6 +101,7 @@ type RedisCache interface { type AppCache interface { Get(key string) ([]byte, error) Set(key string, data []byte, ex int64) error + SetNx(key string, data []byte, ex int64) error Delete(key ...string) error DelAny(pattern string) error Exist(key string) bool diff --git a/internal/dao/cache/web.go b/internal/dao/cache/web.go index 6db706c0..b250ef13 100644 --- a/internal/dao/cache/web.go +++ b/internal/dao/cache/web.go @@ -39,12 +39,21 @@ func (s *appCache) Get(key string) ([]byte, error) { } func (s *appCache) Set(key string, data []byte, ex int64) error { - return s.c.Do(context.Background(), s.c.B().Set(). - Key(key). - Value(utils.String(data)). - ExSeconds(ex). - Build()). - Error() + ctx := context.Background() + cmd := s.c.B().Set().Key(key).Value(utils.String(data)) + if ex > 0 { + return s.c.Do(ctx, cmd.ExSeconds(ex).Build()).Error() + } + return s.c.Do(ctx, cmd.Build()).Error() +} + +func (s *appCache) SetNx(key string, data []byte, ex int64) error { + ctx := context.Background() + cmd := s.c.B().Set().Key(key).Value(utils.String(data)).Nx() + if ex > 0 { + return s.c.Do(ctx, cmd.ExSeconds(ex).Build()).Error() + } + return s.c.Do(ctx, cmd.Build()).Error() } func (s *appCache) Delete(keys ...string) (err error) { diff --git a/internal/events/events.go b/internal/events/events.go index cccc145d..f981c93a 100644 --- a/internal/events/events.go +++ b/internal/events/events.go @@ -8,13 +8,68 @@ import ( "sync" "github.com/alimy/tryst/cfg" + "github.com/alimy/tryst/pool" + "github.com/robfig/cron/v3" + "github.com/rocboss/paopao-ce/internal/conf" "github.com/sirupsen/logrus" ) var ( - _onceInitial sync.Once + _defaultEventManager EventManager + _defaultJobManager JobManager + _onceInitial sync.Once ) +func StartEventManager() { + _defaultEventManager.Start() +} + +func StopEventManager() { + _defaultEventManager.Stop() +} + +// OnEvent push event to gorotine pool then handled automatic. +func OnEvent(event Event) { + _defaultEventManager.OnEvent(event) +} + +func StartJobManager() { + _defaultJobManager.Start() +} + +func StopJobManager() { + _defaultJobManager.Stop() +} + +// NewJob create new Job instance +func NewJob(s cron.Schedule, fn JobFn) Job { + return &simpleJob{ + Schedule: s, + Job: fn, + } +} + +// RemoveJob an entry from being run in the future. +func RemoveJob(id EntryID) { + _defaultJobManager.Remove(id) +} + +// ScheduleJob adds a Job to the Cron to be run on the given schedule. +// The job is wrapped with the configured Chain. +func ScheduleJob(job Job) EntryID { + return _defaultJobManager.Schedule(job) +} + +// Schedule adds a Job to the Cron to be run on the given schedule. +// The job is wrapped with the configured Chain. +func Schedule(s cron.Schedule, fn JobFn) EntryID { + job := &simpleJob{ + Schedule: s, + Job: fn, + } + return _defaultJobManager.Schedule(job) +} + func Initial() { _onceInitial.Do(func() { initEventManager() @@ -24,3 +79,34 @@ func Initial() { } }) } + +func initJobManager() { + _defaultJobManager = NewJobManager() + StartJobManager() +} + +func initEventManager() { + var opts []pool.Option + s := conf.EventManagerSetting + if s.MinWorker > 5 { + opts = append(opts, pool.MinWorkerOpt(s.MinWorker)) + } else { + opts = append(opts, pool.MinWorkerOpt(5)) + } + if s.MaxEventBuf > 10 { + opts = append(opts, pool.MaxRequestBufOpt(s.MaxEventBuf)) + } else { + opts = append(opts, pool.MaxRequestBufOpt(10)) + } + if s.MaxTempEventBuf > 10 { + opts = append(opts, pool.MaxRequestTempBufOpt(s.MaxTempEventBuf)) + } else { + opts = append(opts, pool.MaxRequestTempBufOpt(10)) + } + opts = append(opts, pool.MaxTickCountOpt(s.MaxTickCount), pool.TickWaitTimeOpt(s.TickWaitTime)) + _defaultEventManager = NewEventManager(func(req Event, err error) { + if err != nil { + logrus.Errorf("handle event[%s] occurs error: %s", req.Name(), err) + } + }, opts...) +} diff --git a/internal/events/events_tryst.go b/internal/events/events_tryst.go new file mode 100644 index 00000000..0aaac020 --- /dev/null +++ b/internal/events/events_tryst.go @@ -0,0 +1,40 @@ +// Copyright 2023 ROC. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package events + +import ( + "github.com/alimy/tryst/event" + "github.com/alimy/tryst/pool" +) + +type Event = event.Event + +type EventManager interface { + Start() + Stop() + OnEvent(event Event) +} + +type simpleEventManager struct { + em event.EventManager +} + +func (s *simpleEventManager) Start() { + s.em.Start() +} + +func (s *simpleEventManager) Stop() { + s.em.Stop() +} + +func (s *simpleEventManager) OnEvent(event Event) { + s.em.OnEvent(event) +} + +func NewEventManager(fn pool.RespFn[Event], opts ...pool.Option) EventManager { + return &simpleEventManager{ + em: event.NewEventManager(fn, opts...), + } +} diff --git a/internal/events/jobs.go b/internal/events/jobs.go index f3cc984a..27b73cac 100644 --- a/internal/events/jobs.go +++ b/internal/events/jobs.go @@ -8,10 +8,6 @@ import ( "github.com/robfig/cron/v3" ) -var ( - _defaultJobManager JobManager = (*jobManager)(nil) -) - type ( EntryID = cron.EntryID ) @@ -65,46 +61,8 @@ func (j *jobManager) Schedule(job Job) EntryID { return j.m.Schedule(job, job) } -func initJobManager() { - _defaultJobManager = &jobManager{ +func NewJobManager() JobManager { + return &jobManager{ m: cron.New(), } - StartJobManager() -} - -func StartJobManager() { - _defaultJobManager.Start() -} - -func StopJobManager() { - _defaultJobManager.Stop() -} - -// NewJob create new Job instance -func NewJob(s cron.Schedule, fn JobFn) Job { - return &simpleJob{ - Schedule: s, - Job: fn, - } -} - -// RemoveJob an entry from being run in the future. -func RemoveJob(id EntryID) { - _defaultJobManager.Remove(id) -} - -// ScheduleJob adds a Job to the Cron to be run on the given schedule. -// The job is wrapped with the configured Chain. -func ScheduleJob(job Job) EntryID { - return _defaultJobManager.Schedule(job) -} - -// Schedule adds a Job to the Cron to be run on the given schedule. -// The job is wrapped with the configured Chain. -func Schedule(s cron.Schedule, fn JobFn) EntryID { - job := &simpleJob{ - Schedule: s, - Job: fn, - } - return _defaultJobManager.Schedule(job) } diff --git a/internal/internal.go b/internal/internal.go index 7bdbb94f..20780a10 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -6,6 +6,7 @@ package internal import ( "github.com/rocboss/paopao-ce/internal/events" + "github.com/rocboss/paopao-ce/internal/metrics" "github.com/rocboss/paopao-ce/internal/migration" ) @@ -14,4 +15,6 @@ func Initial() { migration.Run() // event manager system initialize events.Initial() + // metric manager system initialize + metrics.Initial() } diff --git a/internal/events/pool.go b/internal/metrics/metrics.go similarity index 60% rename from internal/events/pool.go rename to internal/metrics/metrics.go index f5170591..ffe7d12a 100644 --- a/internal/events/pool.go +++ b/internal/metrics/metrics.go @@ -2,9 +2,11 @@ // Use of this source code is governed by a MIT style // license that can be found in the LICENSE file. -package events +package metrics import ( + "sync" + "github.com/alimy/tryst/event" "github.com/alimy/tryst/pool" "github.com/rocboss/paopao-ce/internal/conf" @@ -12,10 +14,40 @@ import ( ) var ( - _defaultEventManager event.EventManager + _defaultMetricManager event.EventManager + _onceInitial sync.Once ) -func initEventManager() { +type Metric = event.Event + +type BaseMetric = event.UnimplementedEvent + +type MetricManager interface { + Start() + Stop() + OnMeasure(metric Metric) +} + +func StartMetricManager() { + _defaultMetricManager.Start() +} + +func StopMetricManager() { + _defaultMetricManager.Stop() +} + +// OnMeasure push Metric to gorotine pool then handled automatic. +func OnMeasure(metric Metric) { + _defaultMetricManager.OnEvent(metric) +} + +func Initial() { + _onceInitial.Do(func() { + initMetricManager() + }) +} + +func initMetricManager() { var opts []pool.Option s := conf.EventManagerSetting if s.MinWorker > 5 { @@ -34,22 +66,9 @@ func initEventManager() { opts = append(opts, pool.MaxRequestTempBufOpt(10)) } opts = append(opts, pool.MaxTickCountOpt(s.MaxTickCount), pool.TickWaitTimeOpt(s.TickWaitTime)) - _defaultEventManager = event.NewEventManager(func(req event.Event, err error) { + _defaultMetricManager = event.NewEventManager(func(req Metric, err error) { if err != nil { logrus.Errorf("handle event[%s] occurs error: %s", req.Name(), err) } }, opts...) } - -func StartEventManager() { - _defaultEventManager.Start() -} - -func StopEventManager() { - _defaultEventManager.Stop() -} - -// OnEvent push event to gorotine pool then handled automatic. -func OnEvent(event event.Event) { - _defaultEventManager.OnEvent(event) -} diff --git a/internal/metrics/metrics_tryst.go b/internal/metrics/metrics_tryst.go new file mode 100644 index 00000000..ea0eb28d --- /dev/null +++ b/internal/metrics/metrics_tryst.go @@ -0,0 +1,32 @@ +// Copyright 2023 ROC. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package metrics + +import ( + "github.com/alimy/tryst/event" + "github.com/alimy/tryst/pool" +) + +type simpleMetricManager struct { + mm event.EventManager +} + +func (s *simpleMetricManager) Start() { + s.mm.Start() +} + +func (s *simpleMetricManager) Stop() { + s.mm.Stop() +} + +func (s *simpleMetricManager) OnMeasure(metric Metric) { + s.mm.OnEvent(metric) +} + +func NewMetricManager(fn pool.RespFn[Metric], opts ...pool.Option) MetricManager { + return &simpleMetricManager{ + mm: event.NewEventManager(fn, opts...), + } +} diff --git a/internal/model/web/audit.go b/internal/model/web/audit.go index 5b891177..848ffbf6 100644 --- a/internal/model/web/audit.go +++ b/internal/model/web/audit.go @@ -12,7 +12,8 @@ const ( ) const ( - AuditHookCtxKey = "audit_ctx_key" + AuditHookCtxKey = "audit_ctx_key" + OnlineUserCtxKey = "online_user_ctx_key" ) type AuditStyle uint8 diff --git a/internal/servants/base/events.go b/internal/servants/base/events.go index 545d3399..2c840a2b 100644 --- a/internal/servants/base/events.go +++ b/internal/servants/base/events.go @@ -99,7 +99,7 @@ func (p *ExpireRespEvent) Name() string { return "servants.base.ExpireRespEvent" } -func (p *ExpireRespEvent) Action() (err error) { +func (p *ExpireRespEvent) Action() error { return p.ac.Delete(p.keys...) } @@ -107,7 +107,7 @@ func (p *ExpireAnyRespEvent) Name() string { return "servants.base.ExpireAnyRespEvent" } -func (p *ExpireAnyRespEvent) Action() (err error) { +func (p *ExpireAnyRespEvent) Action() error { return p.ac.DelAny(p.pattern) } diff --git a/internal/servants/chain/chain.go b/internal/servants/chain/chain.go index 84faeca5..2a611bfc 100644 --- a/internal/servants/chain/chain.go +++ b/internal/servants/chain/chain.go @@ -9,16 +9,19 @@ import ( "github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/dao" + "github.com/rocboss/paopao-ce/internal/dao/cache" ) var ( _ums core.UserManageService + _ac core.AppCache _onceUms sync.Once ) func userManageService() core.UserManageService { _onceUms.Do(func() { _ums = dao.DataService() + _ac = cache.NewAppCache() }) return _ums } diff --git a/internal/servants/chain/measure.go b/internal/servants/chain/measure.go new file mode 100644 index 00000000..e3d94c52 --- /dev/null +++ b/internal/servants/chain/measure.go @@ -0,0 +1,21 @@ +// Copyright 2023 ROC. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package chain + +import ( + "github.com/gin-gonic/gin" + "github.com/rocboss/paopao-ce/internal/servants/base" +) + +func OnlineUserMeasure() gin.HandlerFunc { + return func(c *gin.Context) { + // 此midleware后面是真正的http handlder,让handler先执行 + c.Next() + // 更新用户在线状态 + if uid, ok := base.UserIdFrom(c); ok { + OnUserOnlineMetric(_ac, uid) + } + } +} diff --git a/internal/servants/chain/metrics.go b/internal/servants/chain/metrics.go new file mode 100644 index 00000000..e159829e --- /dev/null +++ b/internal/servants/chain/metrics.go @@ -0,0 +1,36 @@ +// Copyright 2023 ROC. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package chain + +import ( + "github.com/rocboss/paopao-ce/internal/conf" + "github.com/rocboss/paopao-ce/internal/core" + "github.com/rocboss/paopao-ce/internal/metrics" +) + +type OnlineUserMetric struct { + metrics.BaseMetric + ac core.AppCache + uid int64 + expire int64 +} + +func OnUserOnlineMetric(ac core.AppCache, uid int64) { + metrics.OnMeasure(&OnlineUserMetric{ + ac: ac, + uid: uid, + expire: conf.CacheSetting.OnlineUserExpire, + }) +} + +func (m *OnlineUserMetric) Name() string { + return "OnlineUserMetric" +} + +func (m *OnlineUserMetric) Action() (err error) { + // 暂时仅做标记,不存储其他相关信息 + m.ac.Set(conf.KeyOnlineUser.Get(m.uid), []byte{}, m.expire) + return +} diff --git a/internal/servants/web/relax.go b/internal/servants/web/relax.go index 7b6dd71e..4c230f66 100644 --- a/internal/servants/web/relax.go +++ b/internal/servants/web/relax.go @@ -26,6 +26,14 @@ type relaxSrv struct { wc core.WebCache } +type relaxChain struct { + api.UnimplementedRelaxChain +} + +func (s *relaxChain) ChainGetUnreadMsgCount() gin.HandlersChain { + return gin.HandlersChain{chain.OnlineUserMeasure()} +} + func (s *relaxSrv) Chain() gin.HandlersChain { return gin.HandlersChain{chain.JwtSurely()} } @@ -50,3 +58,7 @@ func newRelaxSrv(s *base.DaoServant, wc core.WebCache) api.Relax { wc: wc, } } + +func newRelaxChain() api.RelaxChain { + return &relaxChain{} +} diff --git a/internal/servants/web/web.go b/internal/servants/web/web.go index e281310a..42532888 100644 --- a/internal/servants/web/web.go +++ b/internal/servants/web/web.go @@ -34,7 +34,7 @@ func RouteWeb(e *gin.Engine) { // aways register servants api.RegisterAdminServant(e, newAdminSrv(ds)) api.RegisterCoreServant(e, newCoreSrv(ds, _oss, _wc)) - api.RegisterRelaxServant(e, newRelaxSrv(ds, _wc)) + api.RegisterRelaxServant(e, newRelaxSrv(ds, _wc), newRelaxChain()) api.RegisterLooseServant(e, newLooseSrv(ds, _ac)) api.RegisterPrivServant(e, newPrivSrv(ds, _oss), newPrivChain()) api.RegisterPubServant(e, newPubSrv(ds)) diff --git a/mirc/web/v1/relax.go b/mirc/web/v1/relax.go index af4bbe83..3bdfdb91 100644 --- a/mirc/web/v1/relax.go +++ b/mirc/web/v1/relax.go @@ -16,5 +16,5 @@ type Relax struct { Group `mir:"v1"` // GetUnreadMsgCount 获取当前用户未读消息数量 - GetUnreadMsgCount func(Get, web.GetUnreadMsgCountReq) web.GetUnreadMsgCountResp `mir:"/user/msgcount/unread"` + GetUnreadMsgCount func(Get, Chain, web.GetUnreadMsgCountReq) web.GetUnreadMsgCountResp `mir:"/user/msgcount/unread"` } diff --git a/pkg/types/types.go b/pkg/types/types.go index 725f18e4..d8321ad4 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -23,3 +23,7 @@ type Boxes[T any] interface { Box(t T) Unbox() T } + +type Integer interface { + ~int8 | ~int16 | ~int32 | ~int64 | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~int | ~uint +}