Add GRPC and gin server monitoring logic5

pull/1337/head
lin.huang 2 years ago
parent a4a195aad3
commit ea8f720d5c

@ -84,7 +84,7 @@ func run(port int, proPort int) error {
log.ZInfo(context.Background(), "api register public config to discov success")
router := api.NewGinRouter(client, rdb)
//////////////////////////////
p := ginProm.NewPrometheus("app", prom_metrics.G_api_metrics.MetricList())
p := ginProm.NewPrometheus("app", prom_metrics.GetGinCusMetrics("Api"))
p.SetListenAddress(fmt.Sprintf(":%d", proPort))
p.Use(router)

@ -17,6 +17,7 @@ package msggateway
import (
"context"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
"net/http"
"strconv"
"sync"
@ -222,6 +223,7 @@ func (ws *WsServer) registerClient(client *Client) {
log.ZDebug(client.ctx, "user not exist", "userID", client.UserID, "platformID", client.PlatformID)
atomic.AddInt64(&ws.onlineUserNum, 1)
atomic.AddInt64(&ws.onlineUserConnNum, 1)
prom_metrics.OnlineUserGauge.Add(1)
} else {
i := &kickHandler{
clientOK: clientOK,
@ -350,6 +352,7 @@ func (ws *WsServer) unregisterClient(client *Client) {
defer ws.clientPool.Put(client)
isDeleteUser := ws.clients.delete(client.UserID, client.ctx.GetRemoteAddr())
if isDeleteUser {
prom_metrics.OnlineUserGauge.Dec()
atomic.AddInt64(&ws.onlineUserNum, -1)
}
atomic.AddInt64(&ws.onlineUserConnNum, -1)

@ -15,10 +15,17 @@
package msgtransfer
import (
"errors"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register"
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"log"
"net/http"
"sync"
"github.com/OpenIMSDK/tools/mw"
@ -108,6 +115,9 @@ func (m *MsgTransfer) Start(prometheusPort int) error {
var wg sync.WaitGroup
wg.Add(1)
fmt.Println("start msg transfer", "prometheusPort:", prometheusPort)
if prometheusPort <= 0 {
return errors.New("prometheusPort not correct")
}
if config.Config.ChatPersistenceMysql {
// go m.persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(m.persistentCH)
} else {
@ -116,10 +126,19 @@ func (m *MsgTransfer) Start(prometheusPort int) error {
go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyCH)
go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyMongoCH)
// go m.modifyCH.modifyMsgConsumerGroup.RegisterHandleAndConsumer(m.modifyCH)
err := prome.StartPrometheusSrv(prometheusPort)
/*err := prome.StartPrometheusSrv(prometheusPort)
if err != nil {
return err
}
}*/
////////////////////////////
reg := prometheus.NewRegistry()
reg.MustRegister(
collectors.NewGoCollector(),
)
reg.MustRegister(prom_metrics.GetGrpcCusMetrics("Transfer")...)
http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil))
////////////////////////////////////////
wg.Wait()
return nil
}

@ -16,6 +16,7 @@ package msgtransfer
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
"github.com/IBM/sarama"
"google.golang.org/protobuf/proto"
@ -74,6 +75,9 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(
"conversationID",
msgFromMQ.ConversationID,
)
prom_metrics.MsgInsertMongoFailedCounter.Inc()
} else {
prom_metrics.MsgInsertMongoSuccessCounter.Inc()
}
var seqs []int64
for _, msg := range msgFromMQ.MsgData {

@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
"github.com/OpenIMSDK/protocol/conversation"
@ -39,7 +40,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/common/prome"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
)
@ -285,10 +285,9 @@ func (p *Pusher) offlinePushMsg(ctx context.Context, conversationID string, msg
}
err = p.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts)
if err != nil {
prome.Inc(prome.MsgOfflinePushFailedCounter)
prom_metrics.MsgOfflinePushFailedCounter.Inc()
return err
}
prome.Inc(prome.MsgOfflinePushSuccessCounter)
return nil
}

@ -16,6 +16,7 @@ package auth
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
@ -73,6 +74,7 @@ func (s *authServer) UserToken(ctx context.Context, req *pbauth.UserTokenReq) (*
if err != nil {
return nil, err
}
prom_metrics.UserLoginCounter.Inc()
resp.Token = token
resp.ExpireTimeSeconds = config.Config.TokenPolicy.Expire * 24 * 60 * 60
return &resp, nil

@ -16,6 +16,7 @@ package msg
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
@ -59,9 +60,8 @@ func (m *msgServer) sendMsgSuperGroupChat(
ctx context.Context,
req *pbmsg.SendMsgReq,
) (resp *pbmsg.SendMsgResp, err error) {
promepkg.Inc(promepkg.WorkSuperGroupChatMsgRecvSuccessCounter)
if err = m.messageVerification(ctx, req); err != nil {
promepkg.Inc(promepkg.WorkSuperGroupChatMsgProcessFailedCounter)
prom_metrics.GroupChatMsgProcessFailedCounter.Inc()
return nil, err
}
if err = callbackBeforeSendGroupMsg(ctx, req); err != nil {
@ -80,7 +80,7 @@ func (m *msgServer) sendMsgSuperGroupChat(
if err = callbackAfterSendGroupMsg(ctx, req); err != nil {
log.ZWarn(ctx, "CallbackAfterSendGroupMsg", err)
}
promepkg.Inc(promepkg.WorkSuperGroupChatMsgProcessSuccessCounter)
prom_metrics.GroupChatMsgProcessSuccessCounter.Inc()
resp = &pbmsg.SendMsgResp{}
resp.SendTime = req.MsgData.SendTime
resp.ServerMsgID = req.MsgData.ServerMsgID
@ -147,7 +147,6 @@ func (m *msgServer) sendMsgNotification(
}
func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, err error) {
promepkg.Inc(promepkg.SingleChatMsgRecvSuccessCounter)
if err := m.messageVerification(ctx, req); err != nil {
return nil, err
}
@ -166,7 +165,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
}
}
if !isSend {
promepkg.Inc(promepkg.SingleChatMsgProcessFailedCounter)
prom_metrics.SingleChatMsgProcessFailedCounter.Inc()
return nil, nil
} else {
if err = callbackBeforeSendSingleMsg(ctx, req); err != nil {
@ -176,7 +175,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
return nil, err
}
if err := m.MsgDatabase.MsgToMQ(ctx, utils.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil {
promepkg.Inc(promepkg.SingleChatMsgProcessFailedCounter)
prom_metrics.SingleChatMsgProcessFailedCounter.Inc()
return nil, err
}
err = callbackAfterSendSingleMsg(ctx, req)
@ -188,7 +187,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
ClientMsgID: req.MsgData.ClientMsgID,
SendTime: req.MsgData.SendTime,
}
promepkg.Inc(promepkg.SingleChatMsgProcessSuccessCounter)
prom_metrics.SingleChatMsgProcessSuccessCounter.Inc()
return resp, nil
}
}

@ -15,6 +15,9 @@
package cmd
import (
"fmt"
"github.com/OpenIMSDK/protocol/constant"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/spf13/cobra"
"github.com/openimsdk/open-im-server/v3/internal/msgtransfer"
@ -40,3 +43,13 @@ func (m *MsgTransferCmd) Exec() error {
m.addRunE()
return m.Execute()
}
func (a *MsgTransferCmd) GetPortFromConfig(portType string) int {
fmt.Println("GetPortFromConfig:", portType)
if portType == constant.FlagPort {
return 0
} else if portType == constant.FlagPrometheusPort {
return config2.Config.Prometheus.MessageTransferPrometheusPort[0]
}
return 0
}

@ -17,6 +17,7 @@ package controller
import (
"context"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/prom_metrics"
"time"
"github.com/redis/go-redis/v9"
@ -355,10 +356,9 @@ func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversa
func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
currentMaxSeq, err := db.cache.GetMaxSeq(ctx, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil {
prome.Inc(prome.SeqGetFailedCounter)
log.ZError(ctx, "db.cache.GetMaxSeq", err)
return 0, false, err
}
prome.Inc(prome.SeqGetSuccessCounter)
lenList := len(msgs)
if int64(lenList) > db.msg.GetSingleGocMsgNum() {
return 0, false, errors.New("too large")
@ -378,23 +378,20 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
}
failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgs)
if err != nil {
prome.Add(prome.MsgInsertRedisFailedCounter, failedNum)
prom_metrics.MsgInsertRedisFailedCounter.Add(float64(failedNum))
log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID)
} else {
prome.Inc(prome.MsgInsertRedisSuccessCounter)
prom_metrics.MsgInsertRedisSuccessCounter.Inc()
}
err = db.cache.SetMaxSeq(ctx, conversationID, currentMaxSeq)
if err != nil {
prome.Inc(prome.SeqSetFailedCounter)
} else {
prome.Inc(prome.SeqSetSuccessCounter)
log.ZError(ctx, "db.cache.SetMaxSeq error", err, "conversationID", conversationID)
prom_metrics.SeqSetFailedCounter.Inc()
}
err2 := db.cache.SetHasReadSeqs(ctx, conversationID, userSeqMap)
if err != nil {
log.ZError(ctx, "SetHasReadSeqs error", err2, "userSeqMap", userSeqMap, "conversationID", conversationID)
prome.Inc(prome.SeqSetFailedCounter)
} else {
prome.Inc(prome.SeqSetSuccessCounter)
prom_metrics.SeqSetFailedCounter.Inc()
}
return lastMaxSeq, isNew, utils.Wrap(err, "")
}

@ -16,18 +16,6 @@ import (
var defaultMetricPath = "/metrics"
type CusMetrics struct {
MetricsMap map[string]*Metric
}
func (m *CusMetrics) MetricList() []*Metric {
var ret []*Metric
for _, v := range m.MetricsMap {
ret = append(ret, v)
}
return ret
}
// counter, counter_vec, gauge, gauge_vec,
// histogram, histogram_vec, summary, summary_vec
var reqCnt = &Metric{

@ -0,0 +1,44 @@
package prom_metrics
import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/openimsdk/open-im-server/v3/pkg/common/ginPrometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
)
func NewGrpcPromObj(cusMetrics []prometheus.Collector) (*prometheus.Registry, *grpc_prometheus.ServerMetrics, error) {
////////////////////////////////////////////////////////
reg := prometheus.NewRegistry()
grpcMetrics := grpc_prometheus.NewServerMetrics()
grpcMetrics.EnableHandlingTimeHistogram()
cusMetrics = append(cusMetrics, grpcMetrics, collectors.NewGoCollector())
reg.MustRegister(cusMetrics...)
return reg, grpcMetrics, nil
}
func GetGrpcCusMetrics(registerName string) []prometheus.Collector {
switch registerName {
case "MessageGateway":
return []prometheus.Collector{OnlineUserGauge}
case "Msg":
return []prometheus.Collector{SingleChatMsgProcessSuccessCounter, SingleChatMsgProcessFailedCounter, GroupChatMsgProcessSuccessCounter, GroupChatMsgProcessFailedCounter}
case "Transfer":
return []prometheus.Collector{MsgInsertRedisSuccessCounter, MsgInsertRedisFailedCounter, MsgInsertMongoSuccessCounter, MsgInsertMongoFailedCounter, SeqSetFailedCounter}
case "Push":
return []prometheus.Collector{MsgOfflinePushFailedCounter}
case "Auth":
return []prometheus.Collector{UserLoginCounter}
default:
return nil
}
}
func GetGinCusMetrics(name string) []*ginPrometheus.Metric {
switch name {
case "Api":
return []*ginPrometheus.Metric{ApiCustomCnt}
default:
return []*ginPrometheus.Metric{ApiCustomCnt}
}
}

@ -4,21 +4,13 @@ import ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginPrometheus"
/*
labels := prometheus.Labels{"label_one": "any", "label_two": "value"}
G_api_metrics.MetricsMap["custom_total"].MetricCollector.(*prometheus.CounterVec).With(labels).Inc()
ApiCustomCnt.MetricCollector.(*prometheus.CounterVec).With(labels).Inc()
*/
var (
G_api_metrics *ginProm.CusMetrics
)
func init() {
CustomCnt := &ginProm.Metric{
ApiCustomCnt = &ginProm.Metric{
Name: "custom_total",
Description: "Custom counter events.",
Type: "counter_vec",
Args: []string{"label_one", "label_two"},
}
tMetrics := make(map[string]*ginProm.Metric)
tMetrics["custom_total"] = CustomCnt
G_api_metrics = &ginProm.CusMetrics{MetricsMap: tMetrics}
}
)

@ -0,0 +1,12 @@
package prom_metrics
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
UserLoginCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "user_login_total",
Help: "The number of user login",
})
)

@ -0,0 +1,24 @@
package prom_metrics
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
SingleChatMsgProcessSuccessCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "single_chat_msg_process_success_total",
Help: "The number of single chat msg successful processed",
})
SingleChatMsgProcessFailedCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "single_chat_msg_process_failed_total",
Help: "The number of single chat msg failed processed",
})
GroupChatMsgProcessSuccessCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "group_chat_msg_process_success_total",
Help: "The number of group chat msg successful processed",
})
GroupChatMsgProcessFailedCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "group_chat_msg_process_failed_total",
Help: "The number of group chat msg failed processed",
})
)

@ -4,20 +4,9 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
/*
labels := prometheus.Labels{"label_one": "any", "label_two": "value"}
G_grpc_msggateway_metrics.MetricsMap["demo_server_say_hello_method_handle_count"].(*prometheus.CounterVec).With(labels).Inc()
*/
var (
G_grpc_msggateway_metrics *GrpcCusMetricsMap
OnlineUserGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "online_user_num",
Help: "The number of online user num",
})
)
func init() {
customizedCounterMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "demo_server_say_hello_method_handle_count",
Help: "Total number of RPCs handled on the server.",
}, []string{"name"})
tMetrics := make(map[string]prometheus.Collector)
tMetrics["demo_server_say_hello_method_handle_count"] = customizedCounterMetric
G_grpc_msggateway_metrics = &GrpcCusMetricsMap{MetricsMap: tMetrics}
}

@ -1,42 +0,0 @@
package prom_metrics
import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
)
type GrpcCusMetricsMap struct {
MetricsMap map[string]prometheus.Collector
}
func (m *GrpcCusMetricsMap) MetricList() []prometheus.Collector {
var ret []prometheus.Collector
for _, v := range m.MetricsMap {
ret = append(ret, v)
}
return ret
}
func NewGrpcPromObj(cusMetrics []prometheus.Collector) (*prometheus.Registry, *grpc_prometheus.ServerMetrics, error) {
////////////////////////////////////////////////////////
reg := prometheus.NewRegistry()
grpcMetrics := grpc_prometheus.NewServerMetrics()
grpcMetrics.EnableHandlingTimeHistogram()
cusMetrics = append(cusMetrics, grpcMetrics, prometheus.NewGoCollector())
reg.MustRegister(cusMetrics...)
return reg, grpcMetrics, nil
}
func GetGrpcCusMetrics(name string) (*GrpcCusMetricsMap, error) {
switch name {
case "MessageGateway":
return G_grpc_msggateway_metrics, nil
case "User":
return G_grpc_msggateway_metrics, nil
case "Msg":
return G_grpc_msggateway_metrics, nil
case "Conversation":
return G_grpc_msggateway_metrics, nil
default:
return G_grpc_msggateway_metrics, nil
}
}

@ -0,0 +1,12 @@
package prom_metrics
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
MsgOfflinePushFailedCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "msg_offline_push_failed_total",
Help: "The number of msg failed offline pushed",
})
)

@ -0,0 +1,28 @@
package prom_metrics
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
MsgInsertRedisSuccessCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "msg_insert_redis_success_total",
Help: "The number of successful insert msg to redis",
})
MsgInsertRedisFailedCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "msg_insert_redis_failed_total",
Help: "The number of failed insert msg to redis",
})
MsgInsertMongoSuccessCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "msg_insert_mongo_success_total",
Help: "The number of successful insert msg to mongo",
})
MsgInsertMongoFailedCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "msg_insert_mongo_failed_total",
Help: "The number of failed insert msg to mongo",
})
SeqSetFailedCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "seq_set_failed_total",
Help: "The number of failed set seq",
})
)

@ -70,12 +70,8 @@ func Start(
// ctx 中间件
if config.Config.Prometheus.Enable {
//////////////////////////
cusMetrics, err := prom_metrics.GetGrpcCusMetrics(rpcRegisterName)
if err != nil {
fmt.Println("prom_metrics.GetGrpcCusMetrics error")
return err
}
reg, metric, err = prom_metrics.NewGrpcPromObj(cusMetrics.MetricList())
cusMetrics := prom_metrics.GetGrpcCusMetrics(rpcRegisterName)
reg, metric, err = prom_metrics.NewGrpcPromObj(cusMetrics)
options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()),
grpc.UnaryInterceptor(metric.UnaryServerInterceptor()))
} else {

Loading…
Cancel
Save