add base metrics component to measure some server status and add measure online user logic

pull/385/head
Michael Li 1 year ago
parent 2889228fef
commit 771a942b67
No known key found for this signature in database

@ -23,15 +23,27 @@ type Relax interface {
mustEmbedUnimplementedRelaxServant()
}
type RelaxChain interface {
ChainGetUnreadMsgCount() gin.HandlersChain
mustEmbedUnimplementedRelaxChain()
}
// RegisterRelaxServant register Relax servant to gin
func RegisterRelaxServant(e *gin.Engine, s Relax) {
func RegisterRelaxServant(e *gin.Engine, s Relax, m ...RelaxChain) {
var cc RelaxChain
if len(m) > 0 {
cc = m[0]
} else {
cc = &UnimplementedRelaxChain{}
}
router := e.Group("v1")
// use chain for router
middlewares := s.Chain()
router.Use(middlewares...)
// register routes info to router
router.Handle("GET", "/user/msgcount/unread", func(c *gin.Context) {
router.Handle("GET", "/user/msgcount/unread", append(cc.ChainGetUnreadMsgCount(), func(c *gin.Context) {
select {
case <-c.Request.Context().Done():
return
@ -49,7 +61,7 @@ func RegisterRelaxServant(e *gin.Engine, s Relax) {
}
var rv _render_ = resp
rv.Render(c)
})
})...)
}
// UnimplementedRelaxServant can be embedded to have forward compatible implementations.
@ -64,3 +76,12 @@ func (UnimplementedRelaxServant) GetUnreadMsgCount(req *web.GetUnreadMsgCountReq
}
func (UnimplementedRelaxServant) mustEmbedUnimplementedRelaxServant() {}
// UnimplementedRelaxChain can be embedded to have forward compatible implementations.
type UnimplementedRelaxChain struct{}
func (b *UnimplementedRelaxChain) ChainGetUnreadMsgCount() gin.HandlersChain {
return nil
}
func (b *UnimplementedRelaxChain) mustEmbedUnimplementedRelaxChain() {}

@ -8,6 +8,7 @@ import (
"fmt"
"github.com/alimy/tryst/cache"
"github.com/rocboss/paopao-ce/pkg/types"
)
const (
@ -16,12 +17,21 @@ const (
// 以下包含一些在cache中会用到的key的前缀
const (
PrefixNewestTweets = "paopao:newesttweets:"
PrefixHotsTweets = "paopao:hotstweets:"
PrefixFollowingTweets = "paopao:followingtweets:"
PrefixUserTweets = "paopao:usertweets:"
PrefixUnreadmsg = "paopao:unreadmsg:"
PrefixOnlineUser = "paopao:onlineuser:"
)
// 以下包含一些在cache中会用到的池化后的key
var (
KeyNewestTweets cache.KeyPool[int]
KeyHotsTweets cache.KeyPool[int]
KeyFollowingTweets cache.KeyPool[string]
KeyUnreadMsg cache.KeyPool[int64]
KeyOnlineUser cache.KeyPool[int64]
)
func initCacheKeyPool() {
@ -29,7 +39,25 @@ func initCacheKeyPool() {
if poolSize < CacheSetting.KeyPoolSize {
poolSize = CacheSetting.KeyPoolSize
}
KeyUnreadMsg = cache.MustKeyPool[int64](poolSize, func(key int64) string {
return fmt.Sprintf("paopao:unreadmsg:%d", key)
KeyNewestTweets = intKeyPool[int](poolSize, PrefixNewestTweets)
KeyHotsTweets = intKeyPool[int](poolSize, PrefixHotsTweets)
KeyFollowingTweets = strKeyPool(poolSize, PrefixFollowingTweets)
KeyUnreadMsg = intKeyPool[int64](poolSize, PrefixUnreadmsg)
KeyOnlineUser = intKeyPool[int64](poolSize, PrefixOnlineUser)
}
func strKeyPool(size int, prefix string) cache.KeyPool[string] {
return cache.MustKeyPool(size, func(key string) string {
return fmt.Sprintf("%s%s", prefix, key)
})
}
func intKeyPool[T types.Integer](size int, prefix string) cache.KeyPool[T] {
return cache.MustKeyPool[T](size, intKey[T](prefix))
}
func intKey[T types.Integer](prefix string) func(T) string {
return func(key T) string {
return fmt.Sprintf("%s%d", prefix, key)
}
}

@ -37,6 +37,7 @@ var (
AppSetting *appConf
CacheSetting *cacheConf
EventManagerSetting *eventManagerConf
MetricManagerSetting *metricManagerConf
CacheIndexSetting *cacheIndexConf
SimpleCacheIndexSetting *simpleCacheIndexConf
BigCacheIndexSetting *bigCacheIndexConf
@ -73,6 +74,7 @@ func setupSetting(suite []string, noDefault bool) error {
"App": &AppSetting,
"Cache": &CacheSetting,
"EventManager": &EventManagerSetting,
"MetricManager": &MetricManagerSetting,
"PprofServer": &PprofServerSetting,
"WebServer": &WebServerSetting,
"AdminServer": &AdminServerSetting,
@ -121,6 +123,7 @@ func setupSetting(suite []string, noDefault bool) error {
CacheSetting.CientSideCacheExpire *= time.Second
EventManagerSetting.TickWaitTime *= time.Second
MetricManagerSetting.TickWaitTime *= time.Second
JWTSetting.Expire *= time.Second
SimpleCacheIndexSetting.CheckTickDuration *= time.Second
SimpleCacheIndexSetting.ExpireTickDuration *= time.Second

@ -10,12 +10,19 @@ Cache:
CientSideCacheExpire: 60 # 客户端缓存过期时间 默认60s
UnreadMsgExpire: 60 # 未读消息过期时间,单位秒, 默认60s
UserTweetsExpire: 60 # 获取用户推文列表过期时间,单位秒, 默认60s
OnlineUserExpire: 300 # 标记在线用户 过期时间,单位秒, 默认300s
EventManager: # 事件管理器的配置参数
MinWorker: 64 # 最小后台工作者, 设置范围[5, ++], 默认64
MaxEventBuf: 128 # 最大log缓存条数, 设置范围[10, ++], 默认128
MaxTempEventBuf: 256 # 最大log缓存条数, 设置范围[10, ++], 默认256
MaxTickCount: 60 # 最大的循环周期, 设置范围[60, ++], 默认60
TickWaitTime: 1 # 一个周期的等待时间,单位:秒 默认1s
MetricManager: # 指标监控管理器的配置参数
MinWorker: 32 # 最小后台工作者, 设置范围[5, ++], 默认32
MaxEventBuf: 128 # 最大log缓存条数, 设置范围[10, ++], 默认128
MaxTempEventBuf: 256 # 最大log缓存条数, 设置范围[10, ++], 默认256
MaxTickCount: 60 # 最大的循环周期, 设置范围[60, ++], 默认60
TickWaitTime: 1 # 一个周期的等待时间,单位:秒 默认1s
Features:
Default: []
WebServer: # Web服务

@ -101,6 +101,7 @@ type cacheConf struct {
CientSideCacheExpire time.Duration
UnreadMsgExpire int64
UserTweetsExpire int64
OnlineUserExpire int64
}
type eventManagerConf struct {
@ -111,6 +112,14 @@ type eventManagerConf struct {
TickWaitTime time.Duration
}
type metricManagerConf struct {
MinWorker int
MaxEventBuf int
MaxTempEventBuf int
MaxTickCount int
TickWaitTime time.Duration
}
type cacheIndexConf struct {
MaxUpdateQPS int
MinWorker int

@ -101,6 +101,7 @@ type RedisCache interface {
type AppCache interface {
Get(key string) ([]byte, error)
Set(key string, data []byte, ex int64) error
SetNx(key string, data []byte, ex int64) error
Delete(key ...string) error
DelAny(pattern string) error
Exist(key string) bool

@ -39,12 +39,21 @@ func (s *appCache) Get(key string) ([]byte, error) {
}
func (s *appCache) Set(key string, data []byte, ex int64) error {
return s.c.Do(context.Background(), s.c.B().Set().
Key(key).
Value(utils.String(data)).
ExSeconds(ex).
Build()).
Error()
ctx := context.Background()
cmd := s.c.B().Set().Key(key).Value(utils.String(data))
if ex > 0 {
return s.c.Do(ctx, cmd.ExSeconds(ex).Build()).Error()
}
return s.c.Do(ctx, cmd.Build()).Error()
}
func (s *appCache) SetNx(key string, data []byte, ex int64) error {
ctx := context.Background()
cmd := s.c.B().Set().Key(key).Value(utils.String(data)).Nx()
if ex > 0 {
return s.c.Do(ctx, cmd.ExSeconds(ex).Build()).Error()
}
return s.c.Do(ctx, cmd.Build()).Error()
}
func (s *appCache) Delete(keys ...string) (err error) {

@ -8,13 +8,68 @@ import (
"sync"
"github.com/alimy/tryst/cfg"
"github.com/alimy/tryst/pool"
"github.com/robfig/cron/v3"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/sirupsen/logrus"
)
var (
_defaultEventManager EventManager
_defaultJobManager JobManager
_onceInitial sync.Once
)
func StartEventManager() {
_defaultEventManager.Start()
}
func StopEventManager() {
_defaultEventManager.Stop()
}
// OnEvent push event to gorotine pool then handled automatic.
func OnEvent(event Event) {
_defaultEventManager.OnEvent(event)
}
func StartJobManager() {
_defaultJobManager.Start()
}
func StopJobManager() {
_defaultJobManager.Stop()
}
// NewJob create new Job instance
func NewJob(s cron.Schedule, fn JobFn) Job {
return &simpleJob{
Schedule: s,
Job: fn,
}
}
// RemoveJob an entry from being run in the future.
func RemoveJob(id EntryID) {
_defaultJobManager.Remove(id)
}
// ScheduleJob adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func ScheduleJob(job Job) EntryID {
return _defaultJobManager.Schedule(job)
}
// Schedule adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func Schedule(s cron.Schedule, fn JobFn) EntryID {
job := &simpleJob{
Schedule: s,
Job: fn,
}
return _defaultJobManager.Schedule(job)
}
func Initial() {
_onceInitial.Do(func() {
initEventManager()
@ -24,3 +79,34 @@ func Initial() {
}
})
}
func initJobManager() {
_defaultJobManager = NewJobManager()
StartJobManager()
}
func initEventManager() {
var opts []pool.Option
s := conf.EventManagerSetting
if s.MinWorker > 5 {
opts = append(opts, pool.MinWorkerOpt(s.MinWorker))
} else {
opts = append(opts, pool.MinWorkerOpt(5))
}
if s.MaxEventBuf > 10 {
opts = append(opts, pool.MaxRequestBufOpt(s.MaxEventBuf))
} else {
opts = append(opts, pool.MaxRequestBufOpt(10))
}
if s.MaxTempEventBuf > 10 {
opts = append(opts, pool.MaxRequestTempBufOpt(s.MaxTempEventBuf))
} else {
opts = append(opts, pool.MaxRequestTempBufOpt(10))
}
opts = append(opts, pool.MaxTickCountOpt(s.MaxTickCount), pool.TickWaitTimeOpt(s.TickWaitTime))
_defaultEventManager = NewEventManager(func(req Event, err error) {
if err != nil {
logrus.Errorf("handle event[%s] occurs error: %s", req.Name(), err)
}
}, opts...)
}

@ -0,0 +1,40 @@
// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package events
import (
"github.com/alimy/tryst/event"
"github.com/alimy/tryst/pool"
)
type Event = event.Event
type EventManager interface {
Start()
Stop()
OnEvent(event Event)
}
type simpleEventManager struct {
em event.EventManager
}
func (s *simpleEventManager) Start() {
s.em.Start()
}
func (s *simpleEventManager) Stop() {
s.em.Stop()
}
func (s *simpleEventManager) OnEvent(event Event) {
s.em.OnEvent(event)
}
func NewEventManager(fn pool.RespFn[Event], opts ...pool.Option) EventManager {
return &simpleEventManager{
em: event.NewEventManager(fn, opts...),
}
}

@ -8,10 +8,6 @@ import (
"github.com/robfig/cron/v3"
)
var (
_defaultJobManager JobManager = (*jobManager)(nil)
)
type (
EntryID = cron.EntryID
)
@ -65,46 +61,8 @@ func (j *jobManager) Schedule(job Job) EntryID {
return j.m.Schedule(job, job)
}
func initJobManager() {
_defaultJobManager = &jobManager{
func NewJobManager() JobManager {
return &jobManager{
m: cron.New(),
}
StartJobManager()
}
func StartJobManager() {
_defaultJobManager.Start()
}
func StopJobManager() {
_defaultJobManager.Stop()
}
// NewJob create new Job instance
func NewJob(s cron.Schedule, fn JobFn) Job {
return &simpleJob{
Schedule: s,
Job: fn,
}
}
// RemoveJob an entry from being run in the future.
func RemoveJob(id EntryID) {
_defaultJobManager.Remove(id)
}
// ScheduleJob adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func ScheduleJob(job Job) EntryID {
return _defaultJobManager.Schedule(job)
}
// Schedule adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func Schedule(s cron.Schedule, fn JobFn) EntryID {
job := &simpleJob{
Schedule: s,
Job: fn,
}
return _defaultJobManager.Schedule(job)
}

@ -6,6 +6,7 @@ package internal
import (
"github.com/rocboss/paopao-ce/internal/events"
"github.com/rocboss/paopao-ce/internal/metrics"
"github.com/rocboss/paopao-ce/internal/migration"
)
@ -14,4 +15,6 @@ func Initial() {
migration.Run()
// event manager system initialize
events.Initial()
// metric manager system initialize
metrics.Initial()
}

@ -2,9 +2,11 @@
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package events
package metrics
import (
"sync"
"github.com/alimy/tryst/event"
"github.com/alimy/tryst/pool"
"github.com/rocboss/paopao-ce/internal/conf"
@ -12,10 +14,40 @@ import (
)
var (
_defaultEventManager event.EventManager
_defaultMetricManager event.EventManager
_onceInitial sync.Once
)
func initEventManager() {
type Metric = event.Event
type BaseMetric = event.UnimplementedEvent
type MetricManager interface {
Start()
Stop()
OnMeasure(metric Metric)
}
func StartMetricManager() {
_defaultMetricManager.Start()
}
func StopMetricManager() {
_defaultMetricManager.Stop()
}
// OnMeasure push Metric to gorotine pool then handled automatic.
func OnMeasure(metric Metric) {
_defaultMetricManager.OnEvent(metric)
}
func Initial() {
_onceInitial.Do(func() {
initMetricManager()
})
}
func initMetricManager() {
var opts []pool.Option
s := conf.EventManagerSetting
if s.MinWorker > 5 {
@ -34,22 +66,9 @@ func initEventManager() {
opts = append(opts, pool.MaxRequestTempBufOpt(10))
}
opts = append(opts, pool.MaxTickCountOpt(s.MaxTickCount), pool.TickWaitTimeOpt(s.TickWaitTime))
_defaultEventManager = event.NewEventManager(func(req event.Event, err error) {
_defaultMetricManager = event.NewEventManager(func(req Metric, err error) {
if err != nil {
logrus.Errorf("handle event[%s] occurs error: %s", req.Name(), err)
}
}, opts...)
}
func StartEventManager() {
_defaultEventManager.Start()
}
func StopEventManager() {
_defaultEventManager.Stop()
}
// OnEvent push event to gorotine pool then handled automatic.
func OnEvent(event event.Event) {
_defaultEventManager.OnEvent(event)
}

@ -0,0 +1,32 @@
// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package metrics
import (
"github.com/alimy/tryst/event"
"github.com/alimy/tryst/pool"
)
type simpleMetricManager struct {
mm event.EventManager
}
func (s *simpleMetricManager) Start() {
s.mm.Start()
}
func (s *simpleMetricManager) Stop() {
s.mm.Stop()
}
func (s *simpleMetricManager) OnMeasure(metric Metric) {
s.mm.OnEvent(metric)
}
func NewMetricManager(fn pool.RespFn[Metric], opts ...pool.Option) MetricManager {
return &simpleMetricManager{
mm: event.NewEventManager(fn, opts...),
}
}

@ -13,6 +13,7 @@ const (
const (
AuditHookCtxKey = "audit_ctx_key"
OnlineUserCtxKey = "online_user_ctx_key"
)
type AuditStyle uint8

@ -99,7 +99,7 @@ func (p *ExpireRespEvent) Name() string {
return "servants.base.ExpireRespEvent"
}
func (p *ExpireRespEvent) Action() (err error) {
func (p *ExpireRespEvent) Action() error {
return p.ac.Delete(p.keys...)
}
@ -107,7 +107,7 @@ func (p *ExpireAnyRespEvent) Name() string {
return "servants.base.ExpireAnyRespEvent"
}
func (p *ExpireAnyRespEvent) Action() (err error) {
func (p *ExpireAnyRespEvent) Action() error {
return p.ac.DelAny(p.pattern)
}

@ -9,16 +9,19 @@ import (
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/dao"
"github.com/rocboss/paopao-ce/internal/dao/cache"
)
var (
_ums core.UserManageService
_ac core.AppCache
_onceUms sync.Once
)
func userManageService() core.UserManageService {
_onceUms.Do(func() {
_ums = dao.DataService()
_ac = cache.NewAppCache()
})
return _ums
}

@ -0,0 +1,21 @@
// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package chain
import (
"github.com/gin-gonic/gin"
"github.com/rocboss/paopao-ce/internal/servants/base"
)
func OnlineUserMeasure() gin.HandlerFunc {
return func(c *gin.Context) {
// 此midleware后面是真正的http handlder让handler先执行
c.Next()
// 更新用户在线状态
if uid, ok := base.UserIdFrom(c); ok {
OnUserOnlineMetric(_ac, uid)
}
}
}

@ -0,0 +1,36 @@
// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package chain
import (
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/metrics"
)
type OnlineUserMetric struct {
metrics.BaseMetric
ac core.AppCache
uid int64
expire int64
}
func OnUserOnlineMetric(ac core.AppCache, uid int64) {
metrics.OnMeasure(&OnlineUserMetric{
ac: ac,
uid: uid,
expire: conf.CacheSetting.OnlineUserExpire,
})
}
func (m *OnlineUserMetric) Name() string {
return "OnlineUserMetric"
}
func (m *OnlineUserMetric) Action() (err error) {
// 暂时仅做标记,不存储其他相关信息
m.ac.Set(conf.KeyOnlineUser.Get(m.uid), []byte{}, m.expire)
return
}

@ -26,6 +26,14 @@ type relaxSrv struct {
wc core.WebCache
}
type relaxChain struct {
api.UnimplementedRelaxChain
}
func (s *relaxChain) ChainGetUnreadMsgCount() gin.HandlersChain {
return gin.HandlersChain{chain.OnlineUserMeasure()}
}
func (s *relaxSrv) Chain() gin.HandlersChain {
return gin.HandlersChain{chain.JwtSurely()}
}
@ -50,3 +58,7 @@ func newRelaxSrv(s *base.DaoServant, wc core.WebCache) api.Relax {
wc: wc,
}
}
func newRelaxChain() api.RelaxChain {
return &relaxChain{}
}

@ -34,7 +34,7 @@ func RouteWeb(e *gin.Engine) {
// aways register servants
api.RegisterAdminServant(e, newAdminSrv(ds))
api.RegisterCoreServant(e, newCoreSrv(ds, _oss, _wc))
api.RegisterRelaxServant(e, newRelaxSrv(ds, _wc))
api.RegisterRelaxServant(e, newRelaxSrv(ds, _wc), newRelaxChain())
api.RegisterLooseServant(e, newLooseSrv(ds, _ac))
api.RegisterPrivServant(e, newPrivSrv(ds, _oss), newPrivChain())
api.RegisterPubServant(e, newPubSrv(ds))

@ -16,5 +16,5 @@ type Relax struct {
Group `mir:"v1"`
// GetUnreadMsgCount 获取当前用户未读消息数量
GetUnreadMsgCount func(Get, web.GetUnreadMsgCountReq) web.GetUnreadMsgCountResp `mir:"/user/msgcount/unread"`
GetUnreadMsgCount func(Get, Chain, web.GetUnreadMsgCountReq) web.GetUnreadMsgCountResp `mir:"/user/msgcount/unread"`
}

@ -23,3 +23,7 @@ type Boxes[T any] interface {
Box(t T)
Unbox() T
}
type Integer interface {
~int8 | ~int16 | ~int32 | ~int64 | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~int | ~uint
}

Loading…
Cancel
Save