Merge branch 'x/gorm' into x/sqlc

r/paopao-ce-pro
Michael Li 2 years ago
commit 4bd921c3a4
No known key found for this signature in database

@ -383,6 +383,8 @@ release/paopao serve --no-default-features --features sqlite3,localoss,loggerfil
|[`Pyroscope`](docs/proposal/23021510-关于使用pyroscope用于性能调试的设计.md)| 性能优化 | 内测 | 开启Pyroscope功能用于性能调试 |
|[`Pprof`](docs/proposal/23062905-添加Pprof功能特性用于获取Profile.md)| 性能优化 | 内测 | 开启Pprof功能收集Profile信息 |
|`PhoneBind` | 其他 | 稳定 | 手机绑定功能 |
|`UseAuditHook` | 其他 | 内测 | 使用审核hook功能 |
|`UseJobManager` | 其他 | 内测 | 使用JobManager功能 |
|`Web:DisallowUserRegister` | 功能特性 | 稳定 | 不允许用户注册 |
> 功能项状态详情参考 [features-status](features-status.md).

@ -44,8 +44,20 @@ type Priv interface {
mustEmbedUnimplementedPrivServant()
}
type PrivChain interface {
ChainCreateTweet() gin.HandlersChain
mustEmbedUnimplementedPrivChain()
}
// RegisterPrivServant register Priv servant to gin
func RegisterPrivServant(e *gin.Engine, s Priv) {
func RegisterPrivServant(e *gin.Engine, s Priv, m ...PrivChain) {
var cc PrivChain
if len(m) > 0 {
cc = m[0]
} else {
cc = &UnimplementedPrivChain{}
}
router := e.Group("v1")
// use chain for router
middlewares := s.Chain()
@ -297,7 +309,7 @@ func RegisterPrivServant(e *gin.Engine, s Priv) {
}
s.Render(c, nil, s.DeleteTweet(req))
})
router.Handle("POST", "/post", func(c *gin.Context) {
router.Handle("POST", "/post", append(cc.ChainCreateTweet(), func(c *gin.Context) {
select {
case <-c.Request.Context().Done():
return
@ -310,8 +322,13 @@ func RegisterPrivServant(e *gin.Engine, s Priv) {
return
}
resp, err := s.CreateTweet(req)
s.Render(c, resp, err)
})
if err != nil {
s.Render(c, nil, err)
return
}
var rv _render_ = resp
rv.Render(c)
})...)
router.Handle("GET", "/attachment", func(c *gin.Context) {
select {
case <-c.Request.Context().Done():
@ -455,3 +472,12 @@ func (UnimplementedPrivServant) UploadAttachment(req *web.UploadAttachmentReq) (
}
func (UnimplementedPrivServant) mustEmbedUnimplementedPrivServant() {}
// UnimplementedPrivChain can be embedded to have forward compatible implementations.
type UnimplementedPrivChain struct{}
func (b *UnimplementedPrivChain) ChainCreateTweet() gin.HandlersChain {
return nil
}
func (b *UnimplementedPrivChain) mustEmbedUnimplementedPrivChain() {}

@ -23,15 +23,27 @@ type Relax interface {
mustEmbedUnimplementedRelaxServant()
}
type RelaxChain interface {
ChainGetUnreadMsgCount() gin.HandlersChain
mustEmbedUnimplementedRelaxChain()
}
// RegisterRelaxServant register Relax servant to gin
func RegisterRelaxServant(e *gin.Engine, s Relax) {
func RegisterRelaxServant(e *gin.Engine, s Relax, m ...RelaxChain) {
var cc RelaxChain
if len(m) > 0 {
cc = m[0]
} else {
cc = &UnimplementedRelaxChain{}
}
router := e.Group("v1")
// use chain for router
middlewares := s.Chain()
router.Use(middlewares...)
// register routes info to router
router.Handle("GET", "/user/msgcount/unread", func(c *gin.Context) {
router.Handle("GET", "/user/msgcount/unread", append(cc.ChainGetUnreadMsgCount(), func(c *gin.Context) {
select {
case <-c.Request.Context().Done():
return
@ -49,7 +61,7 @@ func RegisterRelaxServant(e *gin.Engine, s Relax) {
}
var rv _render_ = resp
rv.Render(c)
})
})...)
}
// UnimplementedRelaxServant can be embedded to have forward compatible implementations.
@ -64,3 +76,12 @@ func (UnimplementedRelaxServant) GetUnreadMsgCount(req *web.GetUnreadMsgCountReq
}
func (UnimplementedRelaxServant) mustEmbedUnimplementedRelaxServant() {}
// UnimplementedRelaxChain can be embedded to have forward compatible implementations.
type UnimplementedRelaxChain struct{}
func (b *UnimplementedRelaxChain) ChainGetUnreadMsgCount() gin.HandlersChain {
return nil
}
func (b *UnimplementedRelaxChain) mustEmbedUnimplementedRelaxChain() {}

@ -201,7 +201,17 @@
* `PhoneBind` 手机绑定功能;
* [ ] 提按文档
* [x] 接口定义
* [x] 业务逻辑实现
* [x] 业务逻辑实现
* `UseAuditHook` 使用审核hook功能 (目前状态: 内测 待完善后将转为Builtin)
* [ ] 提按文档
* [x] 接口定义
* [x] 业务逻辑实现
* `UseJobManager` 使用JobManager功能 (目前状态: 内测 待完善后将转为Builtin)
* [ ] 提按文档
* [x] 接口定义
* [x] 业务逻辑实现
### 功能特性:
* `Web:DisallowUserRegister` 不允许用户注册;

@ -6,7 +6,7 @@ require (
github.com/Masterminds/semver/v3 v3.2.1
github.com/afocus/captcha v0.0.0-20191010092841-4bd1f21c8868
github.com/alimy/mir/v4 v4.0.0
github.com/alimy/tryst v0.8.2
github.com/alimy/tryst v0.8.3
github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible
github.com/allegro/bigcache/v3 v3.1.0
github.com/bufbuild/connect-go v1.10.0

@ -125,8 +125,8 @@ github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:C
github.com/alexflint/go-filemutex v1.1.0/go.mod h1:7P4iRhttt/nUvUOrYIhcpMzv2G6CY9UnI16Z+UJqRyk=
github.com/alimy/mir/v4 v4.0.0 h1:MzGfmoLjjvR69jbZEmpKJO3tUuqB0RGRv1UWPbtukBg=
github.com/alimy/mir/v4 v4.0.0/go.mod h1:d58dBvw2KImcVbAUANrciEV/of0arMNsI9c/5UNCMMc=
github.com/alimy/tryst v0.8.2 h1:azu5B58vS6m/ZeHovYGWjVvEOJN2llDIBLxuN3qtMtk=
github.com/alimy/tryst v0.8.2/go.mod h1:ua2eJbFrisHPh7z93Bgc0jNBE8Khu1SCx2p/6t3OzZI=
github.com/alimy/tryst v0.8.3 h1:k54a9YesCGUTqfyDp9NL55TI8CxIj8HNJZyzbIoNab8=
github.com/alimy/tryst v0.8.3/go.mod h1:ua2eJbFrisHPh7z93Bgc0jNBE8Khu1SCx2p/6t3OzZI=
github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible h1:Sg/2xHwDrioHpxTN6WMiwbXTpUEinBpHsN7mG21Rc2k=
github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/allegro/bigcache/v3 v3.1.0 h1:H2Vp8VOvxcrB91o86fUSVJFqeuz8kpyyB02eH3bSzwk=

@ -8,6 +8,7 @@ import (
"fmt"
"github.com/alimy/tryst/cache"
"github.com/rocboss/paopao-ce/pkg/types"
)
const (
@ -16,12 +17,21 @@ const (
// 以下包含一些在cache中会用到的key的前缀
const (
PrefixUserTweets = "paopao:usertweets:"
PrefixNewestTweets = "paopao:newesttweets:"
PrefixHotsTweets = "paopao:hotstweets:"
PrefixFollowingTweets = "paopao:followingtweets:"
PrefixUserTweets = "paopao:usertweets:"
PrefixUnreadmsg = "paopao:unreadmsg:"
PrefixOnlineUser = "paopao:onlineuser:"
)
// 以下包含一些在cache中会用到的池化后的key
var (
KeyUnreadMsg cache.KeyPool[int64]
KeyNewestTweets cache.KeyPool[int]
KeyHotsTweets cache.KeyPool[int]
KeyFollowingTweets cache.KeyPool[string]
KeyUnreadMsg cache.KeyPool[int64]
KeyOnlineUser cache.KeyPool[int64]
)
func initCacheKeyPool() {
@ -29,7 +39,25 @@ func initCacheKeyPool() {
if poolSize < CacheSetting.KeyPoolSize {
poolSize = CacheSetting.KeyPoolSize
}
KeyUnreadMsg = cache.MustKeyPool[int64](poolSize, func(key int64) string {
return fmt.Sprintf("paopao:unreadmsg:%d", key)
KeyNewestTweets = intKeyPool[int](poolSize, PrefixNewestTweets)
KeyHotsTweets = intKeyPool[int](poolSize, PrefixHotsTweets)
KeyFollowingTweets = strKeyPool(poolSize, PrefixFollowingTweets)
KeyUnreadMsg = intKeyPool[int64](poolSize, PrefixUnreadmsg)
KeyOnlineUser = intKeyPool[int64](poolSize, PrefixOnlineUser)
}
func strKeyPool(size int, prefix string) cache.KeyPool[string] {
return cache.MustKeyPool(size, func(key string) string {
return fmt.Sprintf("%s%s", prefix, key)
})
}
func intKeyPool[T types.Integer](size int, prefix string) cache.KeyPool[T] {
return cache.MustKeyPool[T](size, intKey[T](prefix))
}
func intKey[T types.Integer](prefix string) func(T) string {
return func(key T) string {
return fmt.Sprintf("%s%d", prefix, key)
}
}

@ -37,6 +37,7 @@ var (
AppSetting *appConf
CacheSetting *cacheConf
EventManagerSetting *eventManagerConf
MetricManagerSetting *metricManagerConf
CacheIndexSetting *cacheIndexConf
SimpleCacheIndexSetting *simpleCacheIndexConf
BigCacheIndexSetting *bigCacheIndexConf
@ -73,6 +74,7 @@ func setupSetting(suite []string, noDefault bool) error {
"App": &AppSetting,
"Cache": &CacheSetting,
"EventManager": &EventManagerSetting,
"MetricManager": &MetricManagerSetting,
"PprofServer": &PprofServerSetting,
"WebServer": &WebServerSetting,
"AdminServer": &AdminServerSetting,
@ -121,6 +123,7 @@ func setupSetting(suite []string, noDefault bool) error {
CacheSetting.CientSideCacheExpire *= time.Second
EventManagerSetting.TickWaitTime *= time.Second
MetricManagerSetting.TickWaitTime *= time.Second
JWTSetting.Expire *= time.Second
SimpleCacheIndexSetting.CheckTickDuration *= time.Second
SimpleCacheIndexSetting.ExpireTickDuration *= time.Second

@ -10,10 +10,17 @@ Cache:
CientSideCacheExpire: 60 # 客户端缓存过期时间 默认60s
UnreadMsgExpire: 60 # 未读消息过期时间,单位秒, 默认60s
UserTweetsExpire: 60 # 获取用户推文列表过期时间,单位秒, 默认60s
OnlineUserExpire: 300 # 标记在线用户 过期时间,单位秒, 默认300s
EventManager: # 事件管理器的配置参数
MinWorker: 10 # 最小后台工作者, 设置范围[5, ++], 默认10
MaxEventBuf: 100 # 最大log缓存条数, 设置范围[10, ++], 默认100
MaxTempEventBuf: 100 # 最大log缓存条数, 设置范围[10, ++], 默认100
MinWorker: 64 # 最小后台工作者, 设置范围[5, ++], 默认64
MaxEventBuf: 128 # 最大log缓存条数, 设置范围[10, ++], 默认128
MaxTempEventBuf: 256 # 最大log缓存条数, 设置范围[10, ++], 默认256
MaxTickCount: 60 # 最大的循环周期, 设置范围[60, ++], 默认60
TickWaitTime: 1 # 一个周期的等待时间,单位:秒 默认1s
MetricManager: # 指标监控管理器的配置参数
MinWorker: 32 # 最小后台工作者, 设置范围[5, ++], 默认32
MaxEventBuf: 128 # 最大log缓存条数, 设置范围[10, ++], 默认128
MaxTempEventBuf: 256 # 最大log缓存条数, 设置范围[10, ++], 默认256
MaxTickCount: 60 # 最大的循环周期, 设置范围[60, ++], 默认60
TickWaitTime: 1 # 一个周期的等待时间,单位:秒 默认1s
Features:

@ -101,6 +101,7 @@ type cacheConf struct {
CientSideCacheExpire time.Duration
UnreadMsgExpire int64
UserTweetsExpire int64
OnlineUserExpire int64
}
type eventManagerConf struct {
@ -111,6 +112,14 @@ type eventManagerConf struct {
TickWaitTime time.Duration
}
type metricManagerConf struct {
MinWorker int
MaxEventBuf int
MaxTempEventBuf int
MaxTickCount int
TickWaitTime time.Duration
}
type cacheIndexConf struct {
MaxUpdateQPS int
MinWorker int

@ -101,6 +101,7 @@ type RedisCache interface {
type AppCache interface {
Get(key string) ([]byte, error)
Set(key string, data []byte, ex int64) error
SetNx(key string, data []byte, ex int64) error
Delete(key ...string) error
DelAny(pattern string) error
Exist(key string) bool

@ -39,12 +39,21 @@ func (s *appCache) Get(key string) ([]byte, error) {
}
func (s *appCache) Set(key string, data []byte, ex int64) error {
return s.c.Do(context.Background(), s.c.B().Set().
Key(key).
Value(utils.String(data)).
ExSeconds(ex).
Build()).
Error()
ctx := context.Background()
cmd := s.c.B().Set().Key(key).Value(utils.String(data))
if ex > 0 {
return s.c.Do(ctx, cmd.ExSeconds(ex).Build()).Error()
}
return s.c.Do(ctx, cmd.Build()).Error()
}
func (s *appCache) SetNx(key string, data []byte, ex int64) error {
ctx := context.Background()
cmd := s.c.B().Set().Key(key).Value(utils.String(data)).Nx()
if ex > 0 {
return s.c.Do(ctx, cmd.ExSeconds(ex).Build()).Error()
}
return s.c.Do(ctx, cmd.Build()).Error()
}
func (s *appCache) Delete(keys ...string) (err error) {

@ -8,19 +8,105 @@ import (
"sync"
"github.com/alimy/tryst/cfg"
"github.com/alimy/tryst/pool"
"github.com/robfig/cron/v3"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/sirupsen/logrus"
)
var (
_onceInitial sync.Once
_defaultEventManager EventManager
_defaultJobManager JobManager
_onceInitial sync.Once
)
func StartEventManager() {
_defaultEventManager.Start()
}
func StopEventManager() {
_defaultEventManager.Stop()
}
// OnEvent push event to gorotine pool then handled automatic.
func OnEvent(event Event) {
_defaultEventManager.OnEvent(event)
}
func StartJobManager() {
_defaultJobManager.Start()
}
func StopJobManager() {
_defaultJobManager.Stop()
}
// NewJob create new Job instance
func NewJob(s cron.Schedule, fn JobFn) Job {
return &simpleJob{
Schedule: s,
Job: fn,
}
}
// RemoveJob an entry from being run in the future.
func RemoveJob(id EntryID) {
_defaultJobManager.Remove(id)
}
// ScheduleJob adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func ScheduleJob(job Job) EntryID {
return _defaultJobManager.Schedule(job)
}
// Schedule adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func Schedule(s cron.Schedule, fn JobFn) EntryID {
job := &simpleJob{
Schedule: s,
Job: fn,
}
return _defaultJobManager.Schedule(job)
}
func Initial() {
_onceInitial.Do(func() {
initEventManager()
if cfg.If("JobManager") {
if cfg.If("UseJobManager") {
initJobManager()
logrus.Debugln("initial JobManager")
}
})
}
func initJobManager() {
_defaultJobManager = NewJobManager()
StartJobManager()
}
func initEventManager() {
var opts []pool.Option
s := conf.EventManagerSetting
if s.MinWorker > 5 {
opts = append(opts, pool.MinWorkerOpt(s.MinWorker))
} else {
opts = append(opts, pool.MinWorkerOpt(5))
}
if s.MaxEventBuf > 10 {
opts = append(opts, pool.MaxRequestBufOpt(s.MaxEventBuf))
} else {
opts = append(opts, pool.MaxRequestBufOpt(10))
}
if s.MaxTempEventBuf > 10 {
opts = append(opts, pool.MaxRequestTempBufOpt(s.MaxTempEventBuf))
} else {
opts = append(opts, pool.MaxRequestTempBufOpt(10))
}
opts = append(opts, pool.MaxTickCountOpt(s.MaxTickCount), pool.TickWaitTimeOpt(s.TickWaitTime))
_defaultEventManager = NewEventManager(func(req Event, err error) {
if err != nil {
logrus.Errorf("handle event[%s] occurs error: %s", req.Name(), err)
}
}, opts...)
}

@ -0,0 +1,40 @@
// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package events
import (
"github.com/alimy/tryst/event"
"github.com/alimy/tryst/pool"
)
type Event = event.Event
type EventManager interface {
Start()
Stop()
OnEvent(event Event)
}
type simpleEventManager struct {
em event.EventManager
}
func (s *simpleEventManager) Start() {
s.em.Start()
}
func (s *simpleEventManager) Stop() {
s.em.Stop()
}
func (s *simpleEventManager) OnEvent(event Event) {
s.em.OnEvent(event)
}
func NewEventManager(fn pool.RespFn[Event], opts ...pool.Option) EventManager {
return &simpleEventManager{
em: event.NewEventManager(fn, opts...),
}
}

@ -8,10 +8,6 @@ import (
"github.com/robfig/cron/v3"
)
var (
_defaultJobManager JobManager = (*jobManager)(nil)
)
type (
EntryID = cron.EntryID
)
@ -65,46 +61,8 @@ func (j *jobManager) Schedule(job Job) EntryID {
return j.m.Schedule(job, job)
}
func initJobManager() {
_defaultJobManager = &jobManager{
func NewJobManager() JobManager {
return &jobManager{
m: cron.New(),
}
StartJobManager()
}
func StartJobManager() {
_defaultJobManager.Start()
}
func StopJobManager() {
_defaultJobManager.Stop()
}
// NewJob create new Job instance
func NewJob(s cron.Schedule, fn JobFn) Job {
return &simpleJob{
Schedule: s,
Job: fn,
}
}
// RemoveJob an entry from being run in the future.
func RemoveJob(id EntryID) {
_defaultJobManager.Remove(id)
}
// ScheduleJob adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func ScheduleJob(job Job) EntryID {
return _defaultJobManager.Schedule(job)
}
// Schedule adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func Schedule(s cron.Schedule, fn JobFn) EntryID {
job := &simpleJob{
Schedule: s,
Job: fn,
}
return _defaultJobManager.Schedule(job)
}

@ -6,6 +6,7 @@ package internal
import (
"github.com/rocboss/paopao-ce/internal/events"
"github.com/rocboss/paopao-ce/internal/metrics"
"github.com/rocboss/paopao-ce/internal/migration"
)
@ -14,4 +15,6 @@ func Initial() {
migration.Run()
// event manager system initialize
events.Initial()
// metric manager system initialize
metrics.Initial()
}

@ -2,9 +2,11 @@
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package events
package metrics
import (
"sync"
"github.com/alimy/tryst/event"
"github.com/alimy/tryst/pool"
"github.com/rocboss/paopao-ce/internal/conf"
@ -12,10 +14,40 @@ import (
)
var (
_defaultEventManager event.EventManager
_defaultMetricManager event.EventManager
_onceInitial sync.Once
)
func initEventManager() {
type Metric = event.Event
type BaseMetric = event.UnimplementedEvent
type MetricManager interface {
Start()
Stop()
OnMeasure(metric Metric)
}
func StartMetricManager() {
_defaultMetricManager.Start()
}
func StopMetricManager() {
_defaultMetricManager.Stop()
}
// OnMeasure push Metric to gorotine pool then handled automatic.
func OnMeasure(metric Metric) {
_defaultMetricManager.OnEvent(metric)
}
func Initial() {
_onceInitial.Do(func() {
initMetricManager()
})
}
func initMetricManager() {
var opts []pool.Option
s := conf.EventManagerSetting
if s.MinWorker > 5 {
@ -34,22 +66,9 @@ func initEventManager() {
opts = append(opts, pool.MaxRequestTempBufOpt(10))
}
opts = append(opts, pool.MaxTickCountOpt(s.MaxTickCount), pool.TickWaitTimeOpt(s.TickWaitTime))
_defaultEventManager = event.NewEventManager(func(req event.Event, err error) {
_defaultMetricManager = event.NewEventManager(func(req Metric, err error) {
if err != nil {
logrus.Errorf("handle event[%s] occurs error: %s", req.Name(), err)
}
}, opts...)
}
func StartEventManager() {
_defaultEventManager.Start()
}
func StopEventManager() {
_defaultEventManager.Stop()
}
// OnEvent push event to gorotine pool then handled automatic.
func OnEvent(event event.Event) {
_defaultEventManager.OnEvent(event)
}

@ -0,0 +1,32 @@
// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package metrics
import (
"github.com/alimy/tryst/event"
"github.com/alimy/tryst/pool"
)
type simpleMetricManager struct {
mm event.EventManager
}
func (s *simpleMetricManager) Start() {
s.mm.Start()
}
func (s *simpleMetricManager) Stop() {
s.mm.Stop()
}
func (s *simpleMetricManager) OnMeasure(metric Metric) {
s.mm.OnEvent(metric)
}
func NewMetricManager(fn pool.RespFn[Metric], opts ...pool.Option) MetricManager {
return &simpleMetricManager{
mm: event.NewEventManager(fn, opts...),
}
}

@ -0,0 +1,40 @@
// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package web
const (
AuditStyleUnknown AuditStyle = iota
AuditStyleUserTweet
AuditStyleUserTweetComment
AuditStyleUserTweetReply
)
const (
AuditHookCtxKey = "audit_ctx_key"
OnlineUserCtxKey = "online_user_ctx_key"
)
type AuditStyle uint8
type AuditMetaInfo struct {
Style AuditStyle
Id int64
}
func (s AuditStyle) String() (res string) {
switch s {
case AuditStyleUserTweet:
res = "UserTweet"
case AuditStyleUserTweetComment:
res = "UserTweetComment"
case AuditStyleUserTweetReply:
res = "UserTweetReply"
case AuditStyleUnknown:
fallthrough
default:
res = "Unknown"
}
return
}

@ -7,12 +7,14 @@ package web
import (
"fmt"
"mime/multipart"
"net/http"
"strings"
"github.com/alimy/mir/v4"
"github.com/gin-gonic/gin"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/core/ms"
"github.com/rocboss/paopao-ce/internal/model/joint"
"github.com/rocboss/paopao-ce/internal/servants/base"
"github.com/rocboss/paopao-ce/pkg/convert"
"github.com/rocboss/paopao-ce/pkg/xerror"
@ -281,3 +283,16 @@ func (r *CreateCommentReq) Bind(c *gin.Context) mir.Error {
r.ClientIP = c.ClientIP()
return bindAny(c, r)
}
func (r *CreateTweetResp) Render(c *gin.Context) {
c.JSON(http.StatusOK, &joint.JsonResp{
Code: 0,
Msg: "success",
Data: r,
})
// 设置审核元信息,用于接下来的审核逻辑
c.Set(AuditHookCtxKey, &AuditMetaInfo{
Style: AuditStyleUserTweet,
Id: r.ID,
})
}

@ -99,7 +99,7 @@ func (p *ExpireRespEvent) Name() string {
return "servants.base.ExpireRespEvent"
}
func (p *ExpireRespEvent) Action() (err error) {
func (p *ExpireRespEvent) Action() error {
return p.ac.Delete(p.keys...)
}
@ -107,7 +107,7 @@ func (p *ExpireAnyRespEvent) Name() string {
return "servants.base.ExpireAnyRespEvent"
}
func (p *ExpireAnyRespEvent) Action() (err error) {
func (p *ExpireAnyRespEvent) Action() error {
return p.ac.DelAny(p.pattern)
}

@ -0,0 +1,25 @@
// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package chain
import (
"github.com/gin-gonic/gin"
"github.com/rocboss/paopao-ce/internal/model/web"
)
func AuditHook() gin.HandlerFunc {
return func(c *gin.Context) {
// 此midleware后面是真正的http handlder让handler先执行
c.Next()
// 审核hook 后处理逻辑
var ami *web.AuditMetaInfo
if val, ok := c.Get(web.AuditHookCtxKey); ok {
if ami, ok = val.(*web.AuditMetaInfo); !ok {
return
}
}
OnAudiotHookEvent(ami)
}
}

@ -9,16 +9,19 @@ import (
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/dao"
"github.com/rocboss/paopao-ce/internal/dao/cache"
)
var (
_ums core.UserManageService
_ac core.AppCache
_onceUms sync.Once
)
func userManageService() core.UserManageService {
_onceUms.Do(func() {
_ums = dao.DataService()
_ac = cache.NewAppCache()
})
return _ums
}

@ -0,0 +1,35 @@
// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package chain
import (
"github.com/alimy/tryst/event"
"github.com/rocboss/paopao-ce/internal/events"
"github.com/rocboss/paopao-ce/internal/model/web"
"github.com/sirupsen/logrus"
)
type AuditHookEvent struct {
event.UnimplementedEvent
ami *web.AuditMetaInfo
}
func (e *AuditHookEvent) Name() string {
return "AuditHookEvent"
}
func (e *AuditHookEvent) Action() error {
// TODO: just log event now will add real logic in future.
logrus.Debugf("auditHook event action style[%s] id[%d]", e.ami.Style, e.ami.Id)
return nil
}
func OnAudiotHookEvent(ami *web.AuditMetaInfo) {
if ami != nil {
events.OnEvent(&AuditHookEvent{
ami: ami,
})
}
}

@ -0,0 +1,21 @@
// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package chain
import (
"github.com/gin-gonic/gin"
"github.com/rocboss/paopao-ce/internal/servants/base"
)
func OnlineUserMeasure() gin.HandlerFunc {
return func(c *gin.Context) {
// 此midleware后面是真正的http handlder让handler先执行
c.Next()
// 更新用户在线状态
if uid, ok := base.UserIdFrom(c); ok {
OnUserOnlineMetric(_ac, uid)
}
}
}

@ -0,0 +1,36 @@
// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package chain
import (
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/metrics"
)
type OnlineUserMetric struct {
metrics.BaseMetric
ac core.AppCache
uid int64
expire int64
}
func OnUserOnlineMetric(ac core.AppCache, uid int64) {
metrics.OnMeasure(&OnlineUserMetric{
ac: ac,
uid: uid,
expire: conf.CacheSetting.OnlineUserExpire,
})
}
func (m *OnlineUserMetric) Name() string {
return "OnlineUserMetric"
}
func (m *OnlineUserMetric) Action() (err error) {
// 暂时仅做标记,不存储其他相关信息
m.ac.Set(conf.KeyOnlineUser.Get(m.uid), []byte{}, m.expire)
return
}

@ -10,6 +10,7 @@ import (
"time"
"github.com/alimy/mir/v4"
"github.com/alimy/tryst/cfg"
"github.com/disintegration/imaging"
"github.com/gin-gonic/gin"
"github.com/gofrs/uuid/v5"
@ -27,7 +28,8 @@ import (
)
var (
_ api.Priv = (*privSrv)(nil)
_ api.Priv = (*privSrv)(nil)
_ api.PrivChain = (*privChain)(nil)
_uploadAttachmentTypeMap = map[string]ms.AttachmentType{
"public/image": ms.AttachmentTypeImage,
@ -35,12 +37,6 @@ var (
"public/video": ms.AttachmentTypeVideo,
"attachment": ms.AttachmentTypeOther,
}
_uploadAttachmentTypes = map[string]cs.AttachmentType{
"public/image": cs.AttachmentTypeImage,
"public/avatar": cs.AttachmentTypeImage,
"public/video": cs.AttachmentTypeVideo,
"attachment": cs.AttachmentTypeOther,
}
)
type privSrv struct {
@ -50,6 +46,17 @@ type privSrv struct {
oss core.ObjectStorageService
}
type privChain struct {
api.UnimplementedPrivChain
}
func (s *privChain) ChainCreateTweet() (res gin.HandlersChain) {
if cfg.If("UseAuditHook") {
res = gin.HandlersChain{chain.AuditHook()}
}
return
}
func (s *privSrv) Chain() gin.HandlersChain {
return gin.HandlersChain{chain.JWT(), chain.Priv()}
}
@ -846,3 +853,7 @@ func newPrivSrv(s *base.DaoServant, oss core.ObjectStorageService) api.Priv {
oss: oss,
}
}
func newPrivChain() api.PrivChain {
return &privChain{}
}

@ -26,6 +26,14 @@ type relaxSrv struct {
wc core.WebCache
}
type relaxChain struct {
api.UnimplementedRelaxChain
}
func (s *relaxChain) ChainGetUnreadMsgCount() gin.HandlersChain {
return gin.HandlersChain{chain.OnlineUserMeasure()}
}
func (s *relaxSrv) Chain() gin.HandlersChain {
return gin.HandlersChain{chain.JwtSurely()}
}
@ -50,3 +58,7 @@ func newRelaxSrv(s *base.DaoServant, wc core.WebCache) api.Relax {
wc: wc,
}
}
func newRelaxChain() api.RelaxChain {
return &relaxChain{}
}

@ -34,9 +34,9 @@ func RouteWeb(e *gin.Engine) {
// aways register servants
api.RegisterAdminServant(e, newAdminSrv(ds))
api.RegisterCoreServant(e, newCoreSrv(ds, _oss, _wc))
api.RegisterRelaxServant(e, newRelaxSrv(ds, _wc))
api.RegisterRelaxServant(e, newRelaxSrv(ds, _wc), newRelaxChain())
api.RegisterLooseServant(e, newLooseSrv(ds, _ac))
api.RegisterPrivServant(e, newPrivSrv(ds, _oss))
api.RegisterPrivServant(e, newPrivSrv(ds, _oss), newPrivChain())
api.RegisterPubServant(e, newPubSrv(ds))
api.RegisterFollowshipServant(e, newFollowshipSrv(ds))
api.RegisterFriendshipServant(e, newFriendshipSrv(ds))

@ -25,7 +25,7 @@ type Priv struct {
DownloadAttachment func(Get, web.DownloadAttachmentReq) web.DownloadAttachmentResp `mir:"/attachment"`
// CreateTweet 发布动态
CreateTweet func(Post, web.CreateTweetReq) web.CreateTweetResp `mir:"/post"`
CreateTweet func(Post, Chain, web.CreateTweetReq) web.CreateTweetResp `mir:"/post"`
// DeleteTweet 删除动态
DeleteTweet func(Delete, web.DeleteTweetReq) `mir:"/post"`

@ -16,5 +16,5 @@ type Relax struct {
Group `mir:"v1"`
// GetUnreadMsgCount 获取当前用户未读消息数量
GetUnreadMsgCount func(Get, web.GetUnreadMsgCountReq) web.GetUnreadMsgCountResp `mir:"/user/msgcount/unread"`
GetUnreadMsgCount func(Get, Chain, web.GetUnreadMsgCountReq) web.GetUnreadMsgCountResp `mir:"/user/msgcount/unread"`
}

@ -23,3 +23,7 @@ type Boxes[T any] interface {
Box(t T)
Unbox() T
}
type Integer interface {
~int8 | ~int16 | ~int32 | ~int64 | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~int | ~uint
}

Loading…
Cancel
Save