diff --git a/internal/api/route.go b/internal/api/route.go index 3305a2d0d..32f8cafde 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -32,7 +32,7 @@ func NewGinRouter() *gin.Engine { prome.NewApiRequestCounter() prome.NewApiRequestFailedCounter() prome.NewApiRequestSuccessCounter() - r.Use(prome.PromeTheusMiddleware) + r.Use(prome.PrometheusMiddleware) r.GET("/metrics", prome.PrometheusHandler()) } userRouterGroup := r.Group("/user") diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 7798a55eb..9945e3277 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -40,7 +40,7 @@ func Run(prometheusPort int) { go ws.run() go rpcSvr.run() go func() { - err := prome.StartPromeSrv(prometheusPort) + err := prome.StartPrometheusSrv(prometheusPort) if err != nil { panic(err) } diff --git a/internal/msggateway/logic.go b/internal/msggateway/logic.go index 3533951de..1c22b0af4 100644 --- a/internal/msggateway/logic.go +++ b/internal/msggateway/logic.go @@ -50,18 +50,18 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) { case constant.WSGetNewestSeq: log.NewInfo(m.OperationID, "getSeqReq ", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.getSeqReq(conn, &m) - prome.PromeInc(prome.GetNewestSeqTotalCounter) + prome.Inc(prome.GetNewestSeqTotalCounter) case constant.WSSendMsg: log.NewInfo(m.OperationID, "sendMsgReq ", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.sendMsgReq(conn, &m) - prome.PromeInc(prome.MsgRecvTotalCounter) + prome.Inc(prome.MsgRecvTotalCounter) case constant.WSSendSignalMsg: log.NewInfo(m.OperationID, "sendSignalMsgReq ", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.sendSignalMsgReq(conn, &m) case constant.WSPullMsgBySeqList: log.NewInfo(m.OperationID, "pullMsgBySeqListReq ", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.pullMsgBySeqListReq(conn, &m) - prome.PromeInc(prome.PullMsgBySeqListTotalCounter) + prome.Inc(prome.PullMsgBySeqListTotalCounter) case constant.WsLogoutMsg: log.NewInfo(m.OperationID, "conn.Close()", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.userLogoutReq(conn, &m) diff --git a/internal/msggateway/relay_rpc_server.go b/internal/msggateway/relay_rpc_server.go index 2cea2ebcc..290453873 100644 --- a/internal/msggateway/relay_rpc_server.go +++ b/internal/msggateway/relay_rpc_server.go @@ -203,7 +203,7 @@ func (r *RPCServer) SuperGroupOnlineBatchPushOneMsg(_ context.Context, req *pbRe resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v) if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { tempT.OnlinePush = true - prome.PromeInc(prome.MsgOnlinePushSuccessCounter) + prome.Inc(prome.MsgOnlinePushSuccessCounter) log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v) temp.ResultCode = resultCode resp = append(resp, temp) diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index 6adad24fe..c8b70be2e 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -351,7 +351,7 @@ func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token for _, v := range ws.wsUserToConn { count = count + len(v) } - prome.PromeGaugeInc(prome.OnlineUserGauge) + prome.GaugeInc(prome.OnlineUserGauge) log.Debug(operationID, "WS Add operation", "", "wsUser added", ws.wsUserToConn, "connection_uid", uid, "connection_platform", constant.PlatformIDToName(platformID), "online_user_num", len(ws.wsUserToConn), "online_conn_num", count) } @@ -393,7 +393,7 @@ func (ws *WServer) delUserConn(conn *UserConn) { if callbackResp.ErrCode != 0 { log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp) } - prome.PromeGaugeDec(prome.OnlineUserGauge) + prome.GaugeDec(prome.OnlineUserGauge) } diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 46fc1b585..832ecc65e 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -46,7 +46,7 @@ func (m *MsgTransfer) Run(promePort int) { go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&m.historyMongoCH) go m.modifyCH.modifyMsgConsumerGroup.RegisterHandleAndConsumer(&m.modifyCH) go func() { - err := prome.StartPromeSrv(promePort) + err := prome.StartPrometheusSrv(promePort) if err != nil { panic(err) } diff --git a/internal/push/init.go b/internal/push/init.go index 090b24b5d..fa211c1ed 100644 --- a/internal/push/init.go +++ b/internal/push/init.go @@ -55,7 +55,7 @@ func (p *Push) Run(prometheusPort int) { go p.rpcServer.run() go p.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&p.pushCh) go func() { - err := prome.StartPromeSrv(prometheusPort) + err := prome.StartPrometheusSrv(prometheusPort) if err != nil { panic(err) } diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 53b05b572..321f0c839 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -187,10 +187,10 @@ func (p *Pusher) OfflinePushMsg(ctx context.Context, sourceID string, msg *sdkws } err = p.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts) if err != nil { - prome.PromeInc(prome.MsgOfflinePushFailedCounter) + prome.Inc(prome.MsgOfflinePushFailedCounter) return err } - prome.PromeInc(prome.MsgOfflinePushSuccessCounter) + prome.Inc(prome.MsgOfflinePushSuccessCounter) return nil } diff --git a/internal/rpc/msg/send_pull.go b/internal/rpc/msg/send_pull.go index 6461189bc..caa473be6 100644 --- a/internal/rpc/msg/send_pull.go +++ b/internal/rpc/msg/send_pull.go @@ -14,14 +14,14 @@ import ( func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) { resp = &msg.SendMsgResp{} - promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter) + promePkg.Inc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter) // callback if err = CallbackBeforeSendGroupMsg(ctx, req); err != nil && err != constant.ErrCallbackContinue { return nil, err } if _, err = m.messageVerification(ctx, req); err != nil { - promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter) + promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter) return nil, err } msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData} @@ -34,7 +34,7 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgR return nil, err } - promePkg.PromeInc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter) + promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter) resp.SendTime = msgToMQSingle.MsgData.SendTime resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID @@ -60,7 +60,7 @@ func (m *msgServer) sendMsgNotification(ctx context.Context, req *msg.SendMsgReq } func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) { - promePkg.PromeInc(promePkg.SingleChatMsgRecvSuccessCounter) + promePkg.Inc(promePkg.SingleChatMsgRecvSuccessCounter) if err = CallbackBeforeSendSingleMsg(ctx, req); err != nil && err != constant.ErrCallbackContinue { return nil, err } @@ -89,7 +89,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq) if err != nil && err != constant.ErrCallbackContinue { return nil, err } - promePkg.PromeInc(promePkg.SingleChatMsgProcessSuccessCounter) + promePkg.Inc(promePkg.SingleChatMsgProcessSuccessCounter) resp.SendTime = msgToMQSingle.MsgData.SendTime resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID @@ -98,7 +98,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq) func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) { // callback - promePkg.PromeInc(promePkg.GroupChatMsgRecvSuccessCounter) + promePkg.Inc(promePkg.GroupChatMsgRecvSuccessCounter) err = CallbackBeforeSendGroupMsg(ctx, req) if err != nil && err != constant.ErrCallbackContinue { return nil, err @@ -106,7 +106,7 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *msg.SendMsgReq) ( var memberUserIDList []string if memberUserIDList, err = m.messageVerification(ctx, req); err != nil { - promePkg.PromeInc(promePkg.GroupChatMsgProcessFailedCounter) + promePkg.Inc(promePkg.GroupChatMsgProcessFailedCounter) return nil, err } @@ -221,7 +221,7 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *msg.SendMsgReq) ( } // - promePkg.PromeInc(promePkg.GroupChatMsgProcessSuccessCounter) + promePkg.Inc(promePkg.GroupChatMsgProcessSuccessCounter) resp.SendTime = msgToMQSingle.MsgData.SendTime resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID diff --git a/internal/startrpc/start.go b/internal/startrpc/start.go index c6beb4880..eca62cad2 100644 --- a/internal/startrpc/start.go +++ b/internal/startrpc/start.go @@ -57,7 +57,7 @@ func start(rpcPorts []int, rpcRegisterName string, prometheusPorts []int, rpcFn return err } if config.Config.Prometheus.Enable { - err := prome.StartPromeSrv(*flagPrometheusPort) + err := prome.StartPrometheusSrv(*flagPrometheusPort) if err != nil { return err } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 33ed7f882..df49e75de 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -242,18 +242,18 @@ func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, doc.DocID = docID doc.Msg = msgsToMongo if err = db.mgo.Create(ctx, doc); err != nil { - prome.PromeInc(prome.MsgInsertMongoFailedCounter) + prome.Inc(prome.MsgInsertMongoFailedCounter) //log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat) return utils.Wrap(err, "") } - prome.PromeInc(prome.MsgInsertMongoSuccessCounter) + prome.Inc(prome.MsgInsertMongoSuccessCounter) } else { - prome.PromeInc(prome.MsgInsertMongoFailedCounter) + prome.Inc(prome.MsgInsertMongoFailedCounter) //log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter) return utils.Wrap(err, "") } } else { - prome.PromeInc(prome.MsgInsertMongoSuccessCounter) + prome.Inc(prome.MsgInsertMongoSuccessCounter) } } if docIDNext != "" { @@ -262,11 +262,11 @@ func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, nextDoc.Msg = msgsToMongoNext //log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID) if err = db.mgo.Create(ctx, nextDoc); err != nil { - prome.PromeInc(prome.MsgInsertMongoFailedCounter) + prome.Inc(prome.MsgInsertMongoFailedCounter) //log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat) return utils.Wrap(err, "") } - prome.PromeInc(prome.MsgInsertMongoSuccessCounter) + prome.Inc(prome.MsgInsertMongoSuccessCounter) } //log.Debug(operationID, "batch mgo cost time ", mongo2.getCurrentTimestampByMill()-newTime, userID, len(msgList)) return nil @@ -296,10 +296,10 @@ func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin //log.Debug(operationID, "constant.SingleChatType lastMaxSeq before add ", currentMaxSeq, "userID ", sourceID, err) } if err != nil && err != redis.Nil { - prome.PromeInc(prome.SeqGetFailedCounter) + prome.Inc(prome.SeqGetFailedCounter) return 0, utils.Wrap(err, "") } - prome.PromeInc(prome.SeqGetSuccessCounter) + prome.Inc(prome.SeqGetSuccessCounter) lastMaxSeq := currentMaxSeq for _, m := range msgList { currentMaxSeq++ @@ -309,10 +309,10 @@ func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin //log.Debug(operationID, "SetMessageToCache ", sourceID, len(msgList)) failedNum, err := db.cache.SetMessageToCache(ctx, sourceID, msgList) if err != nil { - prome.PromeAdd(prome.MsgInsertRedisFailedCounter, failedNum) + prome.Add(prome.MsgInsertRedisFailedCounter, failedNum) //log.Error(operationID, "setMessageToCache failed, continue ", err.Error(), len(msgList), sourceID) } else { - prome.PromeInc(prome.MsgInsertRedisSuccessCounter) + prome.Inc(prome.MsgInsertRedisSuccessCounter) } //log.Debug(operationID, "batch to redis cost time ", mongo2.getCurrentTimestampByMill()-newTime, sourceID, len(msgList)) if msgList[0].MsgData.SessionType == constant.SuperGroupChatType { @@ -321,9 +321,9 @@ func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin err = db.cache.SetUserMaxSeq(ctx, sourceID, currentMaxSeq) } if err != nil { - prome.PromeInc(prome.SeqSetFailedCounter) + prome.Inc(prome.SeqSetFailedCounter) } else { - prome.PromeInc(prome.SeqSetSuccessCounter) + prome.Inc(prome.SeqSetSuccessCounter) } return lastMaxSeq, utils.Wrap(err, "") } @@ -463,18 +463,18 @@ func (db *msgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []i successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, userID, seqs) if err != nil { if err != redis.Nil { - prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) + prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) log.Error(tracelog.GetOperationID(ctx), "get message from redis exception", err.Error(), failedSeqs) } } - prome.PromeAdd(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) + prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) if len(failedSeqs) > 0 { mongoMsgs, err := db.getMsgBySeqs(ctx, userID, seqs, constant.WriteDiffusion) if err != nil { - prome.PromeAdd(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) + prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) return nil, err } - prome.PromeAdd(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs)) + prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs)) successMsgs = append(successMsgs, mongoMsgs...) } return successMsgs, nil @@ -484,18 +484,18 @@ func (db *msgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID strin successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, groupID, seqs) if err != nil { if err != redis.Nil { - prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) + prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) log.Error(tracelog.GetOperationID(ctx), "get message from redis exception", err.Error(), failedSeqs) } } - prome.PromeAdd(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) + prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) if len(failedSeqs) > 0 { mongoMsgs, err := db.getMsgBySeqs(ctx, groupID, seqs, constant.ReadDiffusion) if err != nil { - prome.PromeAdd(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) + prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) return nil, err } - prome.PromeAdd(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs)) + prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs)) successMsgs = append(successMsgs, mongoMsgs...) } return successMsgs, nil diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 69a5e9c53..b7ecba3e4 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -66,7 +66,7 @@ func (p *Producer) SendMessage(m proto.Message, key string, operationID string) partition, offset, err := p.producer.SendMessage(kMsg) log.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg.Key.Length(), kMsg.Value.Length(), p.producer) if err == nil { - prome.PromeInc(prome.SendMsgCounter) + prome.Inc(prome.SendMsgCounter) } return partition, offset, utils.Wrap(err, "") } diff --git a/pkg/common/prome/prometheus.go b/pkg/common/prome/prometheus.go index 065fcf461..89cbea072 100644 --- a/pkg/common/prome/prometheus.go +++ b/pkg/common/prome/prometheus.go @@ -11,7 +11,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) -func StartPromeSrv(prometheusPort int) error { +func StartPrometheusSrv(prometheusPort int) error { if config.Config.Prometheus.Enable { http.Handle("/metrics", promhttp.Handler()) err := http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil) @@ -37,19 +37,19 @@ func (r responseBodyWriter) Write(b []byte) (int, error) { return r.ResponseWriter.Write(b) } -func PromeTheusMiddleware(c *gin.Context) { - PromeInc(ApiRequestCounter) +func PrometheusMiddleware(c *gin.Context) { + Inc(ApiRequestCounter) w := &responseBodyWriter{body: &bytes.Buffer{}, ResponseWriter: c.Writer} c.Writer = w c.Next() if c.Writer.Status() == http.StatusOK { - PromeInc(ApiRequestSuccessCounter) + Inc(ApiRequestSuccessCounter) } else { - PromeInc(ApiRequestFailedCounter) + Inc(ApiRequestFailedCounter) } } -func PromeInc(counter prometheus.Counter) { +func Inc(counter prometheus.Counter) { if config.Config.Prometheus.Enable { if counter != nil { counter.Inc() @@ -57,7 +57,7 @@ func PromeInc(counter prometheus.Counter) { } } -func PromeAdd(counter prometheus.Counter, add int) { +func Add(counter prometheus.Counter, add int) { if config.Config.Prometheus.Enable { if counter != nil { counter.Add(float64(add)) @@ -65,7 +65,7 @@ func PromeAdd(counter prometheus.Counter, add int) { } } -func PromeGaugeInc(gauges prometheus.Gauge) { +func GaugeInc(gauges prometheus.Gauge) { if config.Config.Prometheus.Enable { if gauges != nil { gauges.Inc() @@ -73,7 +73,7 @@ func PromeGaugeInc(gauges prometheus.Gauge) { } } -func PromeGaugeDec(gauges prometheus.Gauge) { +func GaugeDec(gauges prometheus.Gauge) { if config.Config.Prometheus.Enable { if gauges != nil { gauges.Dec()