pull/351/head
wangchuxiao 2 years ago
parent 266a2b4c76
commit a1f79f4508

@ -52,6 +52,10 @@ func main() {
log.Info("load config: ", config.Config)
r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))
if config.Config.Prometheus.Enable {
promePkg.NewApiRequestCounter()
promePkg.NewApiRequestFailedCounter()
promePkg.NewApiRequestSuccessCounter()
r.Use(promePkg.PromeTheusMiddleware)
r.GET("/metrics", promePkg.PrometheusHandler())
}
// user routing group, which handles user registration and login services

@ -4,6 +4,7 @@ import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/common/token_verify"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbRelay "Open_IM/pkg/proto/relay"
@ -12,11 +13,12 @@ import (
"bytes"
"context"
"encoding/gob"
"github.com/golang/protobuf/proto"
"net"
"strconv"
"strings"
"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"
"google.golang.org/grpc"
)
@ -52,7 +54,11 @@ func (r *RPCServer) run() {
panic("listening err:" + err.Error() + r.rpcRegisterName)
}
defer listener.Close()
srv := grpc.NewServer()
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme)
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
pbRelay.RegisterRelayServer(srv, r)

@ -4,6 +4,7 @@ import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbPush "Open_IM/pkg/proto/push"
"Open_IM/pkg/utils"
@ -42,7 +43,11 @@ func (r *RPCServer) run() {
panic("listening err:" + err.Error() + r.rpcRegisterName)
}
defer listener.Close()
srv := grpc.NewServer()
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme)
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
pbPush.RegisterPushMsgServiceServer(srv, r)
rpcRegisterIP := config.Config.RpcRegisterIP

@ -126,8 +126,11 @@ func (rpc *rpcAuth) Run() {
panic("listening err:" + err.Error() + rpc.rpcRegisterName)
}
log.NewInfo(operationID, "listen network success, ", address, listener)
//grpc server
srv := grpc.NewServer()
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme)
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
//service registers with etcd

@ -7,6 +7,7 @@ import (
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
rocksCache "Open_IM/pkg/common/db/rocks_cache"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbConversation "Open_IM/pkg/proto/conversation"
"Open_IM/pkg/utils"
@ -184,7 +185,11 @@ func (rpc *rpcConversation) Run() {
}
log.NewInfo("0", "listen network success, ", address, listener)
//grpc server
srv := grpc.NewServer()
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme)
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
//service registers with etcd

@ -6,8 +6,9 @@ import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/common/db/rocks_cache"
rocksCache "Open_IM/pkg/common/db/rocks_cache"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/common/token_verify"
cp "Open_IM/pkg/common/utils"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
@ -60,7 +61,11 @@ func (s *friendServer) Run() {
log.NewInfo("0", "listen ok ", address)
defer listener.Close()
//grpc server
srv := grpc.NewServer()
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme)
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
//User friend related services register to etcd
pbFriend.RegisterFriendServer(srv, s)

@ -8,6 +8,7 @@ import (
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
rocksCache "Open_IM/pkg/common/db/rocks_cache"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/common/token_verify"
cp "Open_IM/pkg/common/utils"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
@ -69,6 +70,10 @@ func (s *groupServer) Run() {
grpc.MaxRecvMsgSize(recvSize),
grpc.MaxSendMsgSize(sendSize),
}
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme)
}
srv := grpc.NewServer(options...)
defer srv.GracefulStop()
//Service registers with etcd

@ -6,6 +6,7 @@ import (
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/kafka"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
"Open_IM/pkg/proto/msg"
"Open_IM/pkg/utils"
@ -13,16 +14,9 @@ import (
"strconv"
"strings"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
)
var (
sendMsgSuccessCounter prometheus.Counter
sendMsgFailedCounter prometheus.Counter
)
type rpcChat struct {
rpcPort int
rpcRegisterName string
@ -55,14 +49,7 @@ func NewRpcChatServer(port int) *rpcChat {
}
func (rpc *rpcChat) initPrometheus() {
sendMsgSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "send_msg_success",
Help: "The number of send msg success",
})
sendMsgFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "send_msg_failed",
Help: "The number of send msg failed",
})
promePkg.NewSendMsgCount()
}
func (rpc *rpcChat) Run() {
@ -80,7 +67,11 @@ func (rpc *rpcChat) Run() {
}
log.Info("", "listen network success, address ", address)
srv := grpc.NewServer()
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme)
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
rpcRegisterIP := config.Config.RpcRegisterIP

@ -8,6 +8,7 @@ import (
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
rocksCache "Open_IM/pkg/common/db/rocks_cache"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/common/token_verify"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbConversation "Open_IM/pkg/proto/conversation"
@ -61,7 +62,11 @@ func (s *userServer) Run() {
log.NewInfo("0", "listen network success, address ", address, listener)
defer listener.Close()
//grpc server
srv := grpc.NewServer()
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
grpcOpts = append(grpcOpts, promePkg.UnaryServerInterceptorProme)
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
//Service registers with etcd
pbUser.RegisterUserServer(srv, s)

@ -4,8 +4,11 @@ import (
log "Open_IM/pkg/common/log"
"Open_IM/pkg/utils"
"errors"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
promePkg "Open_IM/pkg/common/prometheus"
)
type Producer struct {
@ -57,5 +60,8 @@ func (p *Producer) SendMessage(m proto.Message, key string, operationID string)
}
a, b, c := p.producer.SendMessage(kMsg)
log.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg.Key.Length(), kMsg.Value.Length(), p.producer)
if c == nil {
promePkg.PromeInc(promePkg.SendMsgCounter)
}
return a, b, utils.Wrap(c, "")
}

@ -6,7 +6,7 @@ import (
)
var (
// user rpc
//auth rpc
UserLoginCounter prometheus.Counter
UserRegisterCounter prometheus.Counter
@ -14,6 +14,18 @@ var (
SeqGetFailedCounter prometheus.Counter
SeqSetSuccessCounter prometheus.Counter
SeqSetFailedCounter prometheus.Counter
// api
ApiRequestCounter prometheus.Counter
ApiRequestSuccessCounter prometheus.Counter
ApiRequestFailedCounter prometheus.Counter
// grpc
GrpcRequestCounter prometheus.Counter
GrpcRequestSuccessCounter prometheus.Counter
GrpcRequestFailedCounter prometheus.Counter
SendMsgCounter prometheus.Counter
)
func NewUserLoginCounter() {
@ -23,6 +35,13 @@ func NewUserLoginCounter() {
})
}
func NewUserRegisterCounter() {
UserRegisterCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "user_register",
Help: "The number of user register",
})
}
func NewSeqGetSuccessCounter() {
SeqGetSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "seq_get_success",
@ -48,3 +67,52 @@ func NewSeqSetFailedCounter() {
Help: "The number of failed set seq",
})
}
func NewApiRequestCounter() {
ApiRequestCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "api_request",
Help: "The number of api request",
})
}
func NewApiRequestSuccessCounter() {
ApiRequestSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "api_request_success",
Help: "The number of api request success",
})
}
func NewApiRequestFailedCounter() {
ApiRequestFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "api_request_failed",
Help: "The number of api request failed",
})
}
func NewGrpcRequestCounter() {
GrpcRequestCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "grpc_request",
Help: "The number of api request",
})
}
func NewGrpcRequestSuccessCounter() {
GrpcRequestSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "grpc_request_success",
Help: "The number of grpc request success",
})
}
func NewGrpcRequestFailedCounter() {
GrpcRequestFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "grpc_request_failed",
Help: "The number of grpc request failed",
})
}
func NewSendMsgCount() {
SendMsgCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "send_msg",
Help: "The number of send msg",
})
}

@ -0,0 +1,35 @@
package prometheus
import (
"context"
"encoding/json"
"time"
"Open_IM/pkg/common/log"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
)
func UnaryServerInterceptorProme(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
remote, _ := peer.FromContext(ctx)
remoteAddr := remote.Addr.String()
in, _ := json.Marshal(req)
inStr := string(in)
log.NewInfo("ip", remoteAddr, "access_start", info.FullMethod, "in", inStr)
start := time.Now()
defer func() {
out, _ := json.Marshal(resp)
outStr := string(out)
duration := int64(time.Since(start) / time.Millisecond)
if duration >= 500 {
log.NewInfo("ip", remoteAddr, "access_end", info.FullMethod, "in", inStr, "out", outStr, "err", err, "duration/ms", duration)
} else {
log.NewInfo("ip", remoteAddr, "access_end", info.FullMethod, "in", inStr, "out", outStr, "err", err, "duration/ms", duration)
}
}()
resp, err = handler(ctx, req)
return
}

@ -2,6 +2,7 @@ package prometheus
import (
"Open_IM/pkg/common/config"
"bytes"
"net/http"
"strconv"
@ -26,6 +27,28 @@ func PrometheusHandler() gin.HandlerFunc {
}
}
type responseBodyWriter struct {
gin.ResponseWriter
body *bytes.Buffer
}
func (r responseBodyWriter) Write(b []byte) (int, error) {
r.body.Write(b)
return r.ResponseWriter.Write(b)
}
func PromeTheusMiddleware(c *gin.Context) {
PromeInc(ApiRequestCounter)
w := &responseBodyWriter{body: &bytes.Buffer{}, ResponseWriter: c.Writer}
c.Writer = w
c.Next()
if c.Writer.Status() == http.StatusOK {
PromeInc(ApiRequestSuccessCounter)
} else {
PromeInc(ApiRequestFailedCounter)
}
}
func PromeInc(counter prometheus.Counter) {
if config.Config.Prometheus.Enable {
counter.Inc()

Loading…
Cancel
Save