add event manager temp worker count info to statistics metrics

pull/415/head
Michael Li 12 months ago
parent 0260f8290a
commit 5d637fe0d3
No known key found for this signature in database

@ -7,7 +7,7 @@ require (
github.com/RoaringBitmap/roaring v1.6.0
github.com/afocus/captcha v0.0.0-20191010092841-4bd1f21c8868
github.com/alimy/mir/v4 v4.0.0
github.com/alimy/tryst v0.9.0
github.com/alimy/tryst v0.9.1
github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible
github.com/allegro/bigcache/v3 v3.1.0
github.com/bufbuild/connect-go v1.10.0

@ -127,8 +127,8 @@ github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:C
github.com/alexflint/go-filemutex v1.1.0/go.mod h1:7P4iRhttt/nUvUOrYIhcpMzv2G6CY9UnI16Z+UJqRyk=
github.com/alimy/mir/v4 v4.0.0 h1:MzGfmoLjjvR69jbZEmpKJO3tUuqB0RGRv1UWPbtukBg=
github.com/alimy/mir/v4 v4.0.0/go.mod h1:d58dBvw2KImcVbAUANrciEV/of0arMNsI9c/5UNCMMc=
github.com/alimy/tryst v0.9.0 h1:+pKAeNMD9XDn3Z76LjzCcHacim7SXFGVwJf38i7eW60=
github.com/alimy/tryst v0.9.0/go.mod h1:6FcqEImav7S62em+p+MODh+stt/UPp23HobUOK3XwFY=
github.com/alimy/tryst v0.9.1 h1:EnsvdS/kvUJ7IybrjFdBMGkAiHRTXx0bScdZSUhZb4w=
github.com/alimy/tryst v0.9.1/go.mod h1:6FcqEImav7S62em+p+MODh+stt/UPp23HobUOK3XwFY=
github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible h1:Sg/2xHwDrioHpxTN6WMiwbXTpUEinBpHsN7mG21Rc2k=
github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/allegro/bigcache/v3 v3.1.0 h1:H2Vp8VOvxcrB91o86fUSVJFqeuz8kpyyB02eH3bSzwk=

@ -116,3 +116,8 @@ type WebCache interface {
ExistUnreadMsgCountResp(uid int64) bool
PutHistoryMaxOnline(newScore int) (int, error)
}
type MetricCache interface {
SetEventTempWorkerCount(name string, count int)
GetEventTempWorkerCount(name string) int
}

@ -11,6 +11,7 @@ import (
"github.com/alimy/tryst/pool"
"github.com/robfig/cron/v3"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/metrics/statistics"
"github.com/sirupsen/logrus"
)
@ -103,7 +104,10 @@ func initEventManager() {
} else {
opts = append(opts, pool.MaxRequestTempBufOpt(10))
}
opts = append(opts, pool.MaxTickCountOpt(s.MaxTickCount), pool.TickWaitTimeOpt(s.TickWaitTime))
opts = append(opts,
pool.MaxTickCountOpt(s.MaxTickCount),
pool.TickWaitTimeOpt(s.TickWaitTime),
pool.WorkerHookOpt(NewEventWorkerHook("default", statistics.NewMetricCache())))
_defaultEventManager = NewEventManager(func(req Event, err error) {
if err != nil {
logrus.Errorf("handle event[%s] occurs error: %s", req.Name(), err)

@ -7,6 +7,7 @@ package events
import (
"github.com/alimy/tryst/event"
"github.com/alimy/tryst/pool"
"github.com/rocboss/paopao-ce/internal/core"
)
type Event = event.Event
@ -17,6 +18,11 @@ type EventManager interface {
OnEvent(event Event)
}
type simpleWorkerHook struct {
name string
cache core.MetricCache
}
type simpleEventManager struct {
em event.EventManager
}
@ -33,8 +39,23 @@ func (s *simpleEventManager) OnEvent(event Event) {
s.em.OnEvent(event)
}
func (h *simpleWorkerHook) OnJoin(count int) {
h.cache.SetEventTempWorkerCount(h.name, count)
}
func (h *simpleWorkerHook) OnLeave(count int) {
h.cache.SetEventTempWorkerCount(h.name, count)
}
func NewEventManager(fn pool.RespFn[Event], opts ...pool.Option) EventManager {
return &simpleEventManager{
em: event.NewEventManager(fn, opts...),
}
}
func NewEventWorkerHook(name string, mc core.MetricCache) pool.WorkerHook {
return &simpleWorkerHook{
name: name,
cache: mc,
}
}

@ -13,8 +13,10 @@ import (
type metrics struct {
siteInfo *prometheus.GaugeVec
runtime *prometheus.GaugeVec
ds core.DataService
wc core.WebCache
mc core.MetricCache
}
func (m *metrics) updateSiteInfo() {
@ -31,13 +33,21 @@ func (m *metrics) updateSiteInfo() {
}
}
func (m *metrics) updateRuntime() {
m.runtime.With(prometheus.Labels{"name": "default"}).Set(float64(m.mc.GetEventTempWorkerCount("default")))
}
func (m *metrics) onUpdate() {
logrus.Debugf("update promethues metrics job running")
m.updateSiteInfo()
m.updateRuntime()
}
func newMetrics(reg prometheus.Registerer, ds core.DataService, wc core.WebCache) *metrics {
func newMetrics(reg prometheus.Registerer, ds core.DataService, wc core.WebCache, mc core.MetricCache) *metrics {
m := &metrics{
ds: ds,
wc: wc,
mc: mc,
siteInfo: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "paopao",
@ -49,9 +59,18 @@ func newMetrics(reg prometheus.Registerer, ds core.DataService, wc core.WebCache
// metric name
"name",
}),
ds: ds,
wc: wc,
runtime: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "runtime",
Subsystem: "event_manager",
Name: "temp_worker_count",
Help: "runtime's event manager temp worker count info",
},
[]string{
// metric name
"name",
}),
}
reg.MustRegister(m.siteInfo)
reg.MustRegister(m.siteInfo, m.runtime)
return m
}

@ -27,7 +27,7 @@ func scheduleJobs(metrics *metrics) {
logrus.Debug("shedule prometheus metrics update jobs complete")
}
func NewHandler(ds core.DataService, wc core.WebCache) http.Handler {
func NewHandler(ds core.DataService, wc core.WebCache, mc core.MetricCache) http.Handler {
// Create non-global registry.
registry := prometheus.NewRegistry()
// Add go runtime metrics and process collectors.
@ -35,7 +35,7 @@ func NewHandler(ds core.DataService, wc core.WebCache) http.Handler {
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)
metrics := newMetrics(registry, ds, wc)
metrics := newMetrics(registry, ds, wc, mc)
scheduleJobs(metrics)
return promhttp.HandlerFor(registry, promhttp.HandlerOpts{EnableOpenMetrics: true})
}

@ -0,0 +1,38 @@
// Copyright 2023 ROC. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package statistics
import (
"sync"
"github.com/rocboss/paopao-ce/internal/core"
)
var (
_metricCache core.MetricCache
_onceMetricCache sync.Once
)
type metricCache struct {
eventTempWorkerCount map[string]int
}
func (m *metricCache) SetEventTempWorkerCount(name string, count int) {
// 直接赋值,不需要加锁,因为这仅仅是一个统计信息
m.eventTempWorkerCount[name] = count
}
func (m *metricCache) GetEventTempWorkerCount(name string) int {
return m.eventTempWorkerCount[name]
}
func NewMetricCache() core.MetricCache {
_onceMetricCache.Do(func() {
_metricCache = &metricCache{
eventTempWorkerCount: make(map[string]int),
}
})
return _metricCache
}

@ -14,6 +14,7 @@ import (
"github.com/rocboss/paopao-ce/internal/dao"
"github.com/rocboss/paopao-ce/internal/dao/cache"
"github.com/rocboss/paopao-ce/internal/metrics/prometheus"
"github.com/rocboss/paopao-ce/internal/metrics/statistics"
)
var (
@ -44,9 +45,9 @@ func (s *metricsService) String() string {
func newMetricsService() Service {
addr := conf.MetricsServerSetting.HttpIp + ":" + conf.MetricsServerSetting.HttpPort
server := httpServers.from(addr, func() *httpServer {
ds, wc := dao.DataService(), cache.NewWebCache()
ds, wc, mc := dao.DataService(), cache.NewWebCache(), statistics.NewMetricCache()
mux := http.NewServeMux()
mux.Handle("/metrics", prometheus.NewHandler(ds, wc))
mux.Handle("/metrics", prometheus.NewHandler(ds, wc, mc))
return &httpServer{
baseServer: newBaseServe(),
server: &http.Server{

Loading…
Cancel
Save