diff --git a/cmd/open_im_api/main.go b/cmd/open_im_api/main.go index a4db0e616..8f45a38c8 100644 --- a/cmd/open_im_api/main.go +++ b/cmd/open_im_api/main.go @@ -52,6 +52,10 @@ func main() { log.Info("load config: ", config.Config) r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) if config.Config.Prometheus.Enable { + promePkg.NewApiRequestCounter() + promePkg.NewApiRequestFailedCounter() + promePkg.NewApiRequestSuccessCounter() + r.Use(promePkg.PromeTheusMiddleware) r.GET("/metrics", promePkg.PrometheusHandler()) } // user routing group, which handles user registration and login services diff --git a/docker-compose.yaml b/docker-compose.yaml index 8a903890a..8692c7ba0 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -164,6 +164,7 @@ services: depends_on: - prometheus network_mode: "host" + # -rw-r----- node-exporter: image: quay.io/prometheus/node-exporter diff --git a/docker-compose_cfg/grafana.ini b/docker-compose_cfg/grafana.ini index eeac17d6f..180de10af 100644 --- a/docker-compose_cfg/grafana.ini +++ b/docker-compose_cfg/grafana.ini @@ -1105,7 +1105,7 @@ disable_sanitize_html = false enable_alpha = false app_tls_skip_verify_insecure = false # Enter a comma-separated list of plugin identifiers to identify plugins to load even if they are unsigned. Plugins with modified signatures are never loaded. -allow_loading_unsigned_plugins = +allow_loading_unsigned_plugins = grafana-simple-json-backend-datasource # Enable or disable installing / uninstalling / updating plugins directly from within Grafana. plugin_admin_enabled = true plugin_admin_external_manage_enabled = false diff --git a/internal/msg_gateway/gate/relay_rpc_server.go b/internal/msg_gateway/gate/relay_rpc_server.go index 35636a762..6b673cc14 100644 --- a/internal/msg_gateway/gate/relay_rpc_server.go +++ b/internal/msg_gateway/gate/relay_rpc_server.go @@ -13,11 +13,12 @@ import ( "bytes" "context" "encoding/gob" - "github.com/golang/protobuf/proto" "net" "strconv" "strings" + "github.com/golang/protobuf/proto" + "github.com/gorilla/websocket" "google.golang.org/grpc" ) @@ -63,7 +64,14 @@ func (r *RPCServer) run() { panic("listening err:" + err.Error() + r.rpcRegisterName) } defer listener.Close() - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() pbRelay.RegisterRelayServer(srv, r) diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index aa7ba0b9a..201c6ee41 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -37,7 +37,9 @@ var ( func Init() { cmdCh = make(chan Cmd2Value, 10000) w = new(sync.Mutex) - initPrometheus() + if config.Config.Prometheus.Enable { + initPrometheus() + } persistentCH.Init() // ws2mschat save mysql historyCH.Init(cmdCh) // historyMongoCH.Init() diff --git a/internal/push/logic/push_rpc_server.go b/internal/push/logic/push_rpc_server.go index 8fb99dbfb..44269d504 100644 --- a/internal/push/logic/push_rpc_server.go +++ b/internal/push/logic/push_rpc_server.go @@ -4,6 +4,7 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbPush "Open_IM/pkg/proto/push" "Open_IM/pkg/utils" @@ -42,7 +43,14 @@ func (r *RPCServer) run() { panic("listening err:" + err.Error() + r.rpcRegisterName) } defer listener.Close() - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() pbPush.RegisterPushMsgServiceServer(srv, r) rpcRegisterIP := config.Config.RpcRegisterIP diff --git a/internal/rpc/admin_cms/admin_cms.go b/internal/rpc/admin_cms/admin_cms.go index 6c477af16..bde6d421b 100644 --- a/internal/rpc/admin_cms/admin_cms.go +++ b/internal/rpc/admin_cms/admin_cms.go @@ -6,6 +6,7 @@ import ( "Open_IM/pkg/common/db" imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbAdminCMS "Open_IM/pkg/proto/admin_cms" @@ -57,8 +58,14 @@ func (s *adminCMSServer) Run() { } log.NewInfo("0", "listen network success, ", address, listener) defer listener.Close() - //grpc server - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() //Service registers with etcd pbAdminCMS.RegisterAdminCMSServer(srv, s) diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 3f6a957db..498edf609 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -22,11 +22,6 @@ import ( "google.golang.org/grpc" ) -func (rpc *rpcAuth) initPrometheus() { - promePkg.NewUserLoginCounter() - promePkg.NewUserRegisterCounter() -} - func (rpc *rpcAuth) UserRegister(_ context.Context, req *pbAuth.UserRegisterReq) (*pbAuth.UserRegisterResp, error) { log.NewInfo(req.OperationID, utils.GetSelfFuncName(), " rpc args ", req.String()) var user db.User @@ -126,8 +121,16 @@ func (rpc *rpcAuth) Run() { panic("listening err:" + err.Error() + rpc.rpcRegisterName) } log.NewInfo(operationID, "listen network success, ", address, listener) - //grpc server - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() + promePkg.NewUserRegisterCounter() + promePkg.NewUserLoginCounter() + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() //service registers with etcd @@ -149,9 +152,6 @@ func (rpc *rpcAuth) Run() { } log.NewInfo(operationID, "RegisterAuthServer ok ", rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName) - if config.Config.Prometheus.Enable { - rpc.initPrometheus() - } err = srv.Serve(listener) if err != nil { log.NewError(operationID, "Serve failed ", err.Error()) diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 3a6af935e..ee04b1974 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -7,6 +7,7 @@ import ( imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbConversation "Open_IM/pkg/proto/conversation" "Open_IM/pkg/utils" @@ -184,7 +185,14 @@ func (rpc *rpcConversation) Run() { } log.NewInfo("0", "listen network success, ", address, listener) //grpc server - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() //service registers with etcd diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 29ec99eab..696217cb8 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -6,8 +6,9 @@ import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" - "Open_IM/pkg/common/db/rocks_cache" + rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" cp "Open_IM/pkg/common/utils" "Open_IM/pkg/grpc-etcdv3/getcdv3" @@ -60,7 +61,14 @@ func (s *friendServer) Run() { log.NewInfo("0", "listen ok ", address) defer listener.Close() //grpc server - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() //User friend related services register to etcd pbFriend.RegisterFriendServer(srv, s) diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index a36547c2c..2193cd89c 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -8,6 +8,7 @@ import ( imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" cp "Open_IM/pkg/common/utils" "Open_IM/pkg/grpc-etcdv3/getcdv3" @@ -69,6 +70,13 @@ func (s *groupServer) Run() { grpc.MaxRecvMsgSize(recvSize), grpc.MaxSendMsgSize(sendSize), } + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + } srv := grpc.NewServer(options...) defer srv.GracefulStop() //Service registers with etcd diff --git a/internal/rpc/msg/rpcChat.go b/internal/rpc/msg/rpcChat.go index 536a5897b..119c3ae7f 100644 --- a/internal/rpc/msg/rpcChat.go +++ b/internal/rpc/msg/rpcChat.go @@ -6,6 +6,7 @@ import ( "Open_IM/pkg/common/db" "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/grpc-etcdv3/getcdv3" "Open_IM/pkg/proto/msg" "Open_IM/pkg/utils" @@ -13,7 +14,6 @@ import ( "strconv" "strings" - promePkg "Open_IM/pkg/common/prometheus" "google.golang.org/grpc" ) @@ -94,7 +94,14 @@ func (rpc *rpcChat) Run() { } log.Info("", "listen network success, address ", address) - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() rpcRegisterIP := config.Config.RpcRegisterIP diff --git a/internal/rpc/office/office.go b/internal/rpc/office/office.go index 741938ca3..4d12267e5 100644 --- a/internal/rpc/office/office.go +++ b/internal/rpc/office/office.go @@ -8,6 +8,7 @@ import ( "Open_IM/pkg/common/db/mysql_model/im_mysql_model" imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbCache "Open_IM/pkg/proto/cache" pbOffice "Open_IM/pkg/proto/office" @@ -63,11 +64,17 @@ func (s *officeServer) Run() { //grpc server recvSize := 1024 * 1024 * 30 sendSize := 1024 * 1024 * 30 - var options = []grpc.ServerOption{ + var grpcOpts = []grpc.ServerOption{ grpc.MaxRecvMsgSize(recvSize), grpc.MaxSendMsgSize(sendSize), } - srv := grpc.NewServer(options...) + if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() //Service registers with etcd pbOffice.RegisterOfficeServiceServer(srv, s) diff --git a/internal/rpc/organization/organization.go b/internal/rpc/organization/organization.go index 89cd15321..327e40003 100644 --- a/internal/rpc/organization/organization.go +++ b/internal/rpc/organization/organization.go @@ -8,6 +8,7 @@ import ( imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbAuth "Open_IM/pkg/proto/auth" @@ -58,7 +59,14 @@ func (s *organizationServer) Run() { log.NewInfo("", "listen network success, ", address, listener) defer listener.Close() //grpc server - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() //Service registers with etcd rpc.RegisterOrganizationServer(srv, s) diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index cf4e959a5..080a50b7f 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -8,6 +8,7 @@ import ( imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" + promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbConversation "Open_IM/pkg/proto/conversation" @@ -61,7 +62,14 @@ func (s *userServer) Run() { log.NewInfo("0", "listen network success, address ", address, listener) defer listener.Close() //grpc server - srv := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() + grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme)) + } + srv := grpc.NewServer(grpcOpts...) defer srv.GracefulStop() //Service registers with etcd pbUser.RegisterUserServer(srv, s) diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 4eb10aebd..15c08ac04 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -4,8 +4,11 @@ import ( log "Open_IM/pkg/common/log" "Open_IM/pkg/utils" "errors" + "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" + + promePkg "Open_IM/pkg/common/prometheus" ) type Producer struct { @@ -57,5 +60,8 @@ func (p *Producer) SendMessage(m proto.Message, key string, operationID string) } a, b, c := p.producer.SendMessage(kMsg) log.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg.Key.Length(), kMsg.Value.Length(), p.producer) + if c == nil { + promePkg.PromeInc(promePkg.SendMsgCounter) + } return a, b, utils.Wrap(c, "") } diff --git a/pkg/common/prometheus/gather.go b/pkg/common/prometheus/gather.go index 471de8d0f..af47db1a6 100644 --- a/pkg/common/prometheus/gather.go +++ b/pkg/common/prometheus/gather.go @@ -6,6 +6,7 @@ import ( ) var ( + //auth rpc UserLoginCounter prometheus.Counter UserRegisterCounter prometheus.Counter @@ -46,6 +47,18 @@ var ( MsgOnlinePushSuccessCounter prometheus.Counter MsgOfflinePushSuccessCounter prometheus.Counter MsgOfflinePushFailedCounter prometheus.Counter + // api + ApiRequestCounter prometheus.Counter + ApiRequestSuccessCounter prometheus.Counter + ApiRequestFailedCounter prometheus.Counter + + // grpc + GrpcRequestCounter prometheus.Counter + GrpcRequestSuccessCounter prometheus.Counter + GrpcRequestFailedCounter prometheus.Counter + + SendMsgCounter prometheus.Counter + ) func NewUserLoginCounter() { @@ -87,6 +100,55 @@ func NewSeqSetFailedCounter() { }) } +func NewApiRequestCounter() { + ApiRequestCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "api_request", + Help: "The number of api request", + }) +} + +func NewApiRequestSuccessCounter() { + ApiRequestSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "api_request_success", + Help: "The number of api request success", + }) +} + +func NewApiRequestFailedCounter() { + ApiRequestFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "api_request_failed", + Help: "The number of api request failed", + }) +} + +func NewGrpcRequestCounter() { + GrpcRequestCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "grpc_request", + Help: "The number of api request", + }) +} + +func NewGrpcRequestSuccessCounter() { + GrpcRequestSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "grpc_request_success", + Help: "The number of grpc request success", + }) +} + +func NewGrpcRequestFailedCounter() { + GrpcRequestFailedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "grpc_request_failed", + Help: "The number of grpc request failed", + }) +} + +func NewSendMsgCount() { + SendMsgCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "send_msg", + Help: "The number of send msg", + }) +} + func NewMsgInsertRedisSuccessCounter() { MsgInsertRedisSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "msg_insert_redis_success", diff --git a/pkg/common/prometheus/grpc.go b/pkg/common/prometheus/grpc.go new file mode 100644 index 000000000..d0d513b00 --- /dev/null +++ b/pkg/common/prometheus/grpc.go @@ -0,0 +1,35 @@ +package prometheus + +import ( + "context" + "encoding/json" + "time" + + "Open_IM/pkg/common/log" + + "google.golang.org/grpc" + "google.golang.org/grpc/peer" +) + +func UnaryServerInterceptorProme(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + remote, _ := peer.FromContext(ctx) + remoteAddr := remote.Addr.String() + + in, _ := json.Marshal(req) + inStr := string(in) + log.NewInfo("ip", remoteAddr, "access_start", info.FullMethod, "in", inStr) + + start := time.Now() + defer func() { + out, _ := json.Marshal(resp) + outStr := string(out) + duration := int64(time.Since(start) / time.Millisecond) + if duration >= 500 { + log.NewInfo("ip", remoteAddr, "access_end", info.FullMethod, "in", inStr, "out", outStr, "err", err, "duration/ms", duration) + } else { + log.NewInfo("ip", remoteAddr, "access_end", info.FullMethod, "in", inStr, "out", outStr, "err", err, "duration/ms", duration) + } + }() + resp, err = handler(ctx, req) + return +} diff --git a/pkg/common/prometheus/prometheus.go b/pkg/common/prometheus/prometheus.go index 603ac9564..e8bc87b47 100644 --- a/pkg/common/prometheus/prometheus.go +++ b/pkg/common/prometheus/prometheus.go @@ -2,6 +2,7 @@ package prometheus import ( "Open_IM/pkg/common/config" + "bytes" "net/http" "strconv" @@ -26,6 +27,28 @@ func PrometheusHandler() gin.HandlerFunc { } } +type responseBodyWriter struct { + gin.ResponseWriter + body *bytes.Buffer +} + +func (r responseBodyWriter) Write(b []byte) (int, error) { + r.body.Write(b) + return r.ResponseWriter.Write(b) +} + +func PromeTheusMiddleware(c *gin.Context) { + PromeInc(ApiRequestCounter) + w := &responseBodyWriter{body: &bytes.Buffer{}, ResponseWriter: c.Writer} + c.Writer = w + c.Next() + if c.Writer.Status() == http.StatusOK { + PromeInc(ApiRequestSuccessCounter) + } else { + PromeInc(ApiRequestFailedCounter) + } +} + func PromeInc(counter prometheus.Counter) { if config.Config.Prometheus.Enable { if counter != nil {