Merge remote-tracking branch 'origin/v2.3.0release' into v2.3.0release

# Conflicts:
#	internal/rpc/msg/rpcChat.go
#	pkg/common/prometheus/gather.go
pull/351/head
skiffer-git 2 years ago
commit d6abbacbf7

@ -52,6 +52,10 @@ func main() {
log.Info("load config: ", config.Config)
r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))
if config.Config.Prometheus.Enable {
promePkg.NewApiRequestCounter()
promePkg.NewApiRequestFailedCounter()
promePkg.NewApiRequestSuccessCounter()
r.Use(promePkg.PromeTheusMiddleware)
r.GET("/metrics", promePkg.PrometheusHandler())
}
// user routing group, which handles user registration and login services

@ -164,6 +164,7 @@ services:
depends_on:
- prometheus
network_mode: "host"
# -rw-r-----
node-exporter:
image: quay.io/prometheus/node-exporter

@ -1105,7 +1105,7 @@ disable_sanitize_html = false
enable_alpha = false
app_tls_skip_verify_insecure = false
# Enter a comma-separated list of plugin identifiers to identify plugins to load even if they are unsigned. Plugins with modified signatures are never loaded.
allow_loading_unsigned_plugins =
allow_loading_unsigned_plugins = grafana-simple-json-backend-datasource
# Enable or disable installing / uninstalling / updating plugins directly from within Grafana.
plugin_admin_enabled = true
plugin_admin_external_manage_enabled = false

@ -13,11 +13,12 @@ import (
"bytes"
"context"
"encoding/gob"
"github.com/golang/protobuf/proto"
"net"
"strconv"
"strings"
"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"
"google.golang.org/grpc"
)
@ -63,7 +64,14 @@ func (r *RPCServer) run() {
panic("listening err:" + err.Error() + r.rpcRegisterName)
}
defer listener.Close()
srv := grpc.NewServer()
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()
promePkg.NewGrpcRequestSuccessCounter()
grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme))
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
pbRelay.RegisterRelayServer(srv, r)

@ -37,7 +37,9 @@ var (
func Init() {
cmdCh = make(chan Cmd2Value, 10000)
w = new(sync.Mutex)
if config.Config.Prometheus.Enable {
initPrometheus()
}
persistentCH.Init() // ws2mschat save mysql
historyCH.Init(cmdCh) //
historyMongoCH.Init()

@ -4,6 +4,7 @@ import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbPush "Open_IM/pkg/proto/push"
"Open_IM/pkg/utils"
@ -42,7 +43,14 @@ func (r *RPCServer) run() {
panic("listening err:" + err.Error() + r.rpcRegisterName)
}
defer listener.Close()
srv := grpc.NewServer()
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()
promePkg.NewGrpcRequestSuccessCounter()
grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme))
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
pbPush.RegisterPushMsgServiceServer(srv, r)
rpcRegisterIP := config.Config.RpcRegisterIP

@ -6,6 +6,7 @@ import (
"Open_IM/pkg/common/db"
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/common/token_verify"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbAdminCMS "Open_IM/pkg/proto/admin_cms"
@ -57,8 +58,14 @@ func (s *adminCMSServer) Run() {
}
log.NewInfo("0", "listen network success, ", address, listener)
defer listener.Close()
//grpc server
srv := grpc.NewServer()
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()
promePkg.NewGrpcRequestSuccessCounter()
grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme))
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
//Service registers with etcd
pbAdminCMS.RegisterAdminCMSServer(srv, s)

@ -22,11 +22,6 @@ import (
"google.golang.org/grpc"
)
func (rpc *rpcAuth) initPrometheus() {
promePkg.NewUserLoginCounter()
promePkg.NewUserRegisterCounter()
}
func (rpc *rpcAuth) UserRegister(_ context.Context, req *pbAuth.UserRegisterReq) (*pbAuth.UserRegisterResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), " rpc args ", req.String())
var user db.User
@ -126,8 +121,16 @@ func (rpc *rpcAuth) Run() {
panic("listening err:" + err.Error() + rpc.rpcRegisterName)
}
log.NewInfo(operationID, "listen network success, ", address, listener)
//grpc server
srv := grpc.NewServer()
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()
promePkg.NewGrpcRequestSuccessCounter()
promePkg.NewUserRegisterCounter()
promePkg.NewUserLoginCounter()
grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme))
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
//service registers with etcd
@ -149,9 +152,6 @@ func (rpc *rpcAuth) Run() {
}
log.NewInfo(operationID, "RegisterAuthServer ok ", rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName)
if config.Config.Prometheus.Enable {
rpc.initPrometheus()
}
err = srv.Serve(listener)
if err != nil {
log.NewError(operationID, "Serve failed ", err.Error())

@ -7,6 +7,7 @@ import (
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
rocksCache "Open_IM/pkg/common/db/rocks_cache"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbConversation "Open_IM/pkg/proto/conversation"
"Open_IM/pkg/utils"
@ -184,7 +185,14 @@ func (rpc *rpcConversation) Run() {
}
log.NewInfo("0", "listen network success, ", address, listener)
//grpc server
srv := grpc.NewServer()
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()
promePkg.NewGrpcRequestSuccessCounter()
grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme))
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
//service registers with etcd

@ -6,8 +6,9 @@ import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/common/db/rocks_cache"
rocksCache "Open_IM/pkg/common/db/rocks_cache"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/common/token_verify"
cp "Open_IM/pkg/common/utils"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
@ -60,7 +61,14 @@ func (s *friendServer) Run() {
log.NewInfo("0", "listen ok ", address)
defer listener.Close()
//grpc server
srv := grpc.NewServer()
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()
promePkg.NewGrpcRequestSuccessCounter()
grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme))
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
//User friend related services register to etcd
pbFriend.RegisterFriendServer(srv, s)

@ -8,6 +8,7 @@ import (
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
rocksCache "Open_IM/pkg/common/db/rocks_cache"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/common/token_verify"
cp "Open_IM/pkg/common/utils"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
@ -69,6 +70,13 @@ func (s *groupServer) Run() {
grpc.MaxRecvMsgSize(recvSize),
grpc.MaxSendMsgSize(sendSize),
}
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()
promePkg.NewGrpcRequestSuccessCounter()
grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme))
}
srv := grpc.NewServer(options...)
defer srv.GracefulStop()
//Service registers with etcd

@ -6,6 +6,7 @@ import (
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/kafka"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
"Open_IM/pkg/proto/msg"
"Open_IM/pkg/utils"
@ -13,7 +14,6 @@ import (
"strconv"
"strings"
promePkg "Open_IM/pkg/common/prometheus"
"google.golang.org/grpc"
)
@ -94,7 +94,14 @@ func (rpc *rpcChat) Run() {
}
log.Info("", "listen network success, address ", address)
srv := grpc.NewServer()
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()
promePkg.NewGrpcRequestSuccessCounter()
grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme))
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
rpcRegisterIP := config.Config.RpcRegisterIP

@ -8,6 +8,7 @@ import (
"Open_IM/pkg/common/db/mysql_model/im_mysql_model"
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbCache "Open_IM/pkg/proto/cache"
pbOffice "Open_IM/pkg/proto/office"
@ -63,11 +64,17 @@ func (s *officeServer) Run() {
//grpc server
recvSize := 1024 * 1024 * 30
sendSize := 1024 * 1024 * 30
var options = []grpc.ServerOption{
var grpcOpts = []grpc.ServerOption{
grpc.MaxRecvMsgSize(recvSize),
grpc.MaxSendMsgSize(sendSize),
}
srv := grpc.NewServer(options...)
if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()
promePkg.NewGrpcRequestSuccessCounter()
grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme))
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
//Service registers with etcd
pbOffice.RegisterOfficeServiceServer(srv, s)

@ -8,6 +8,7 @@ import (
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
rocksCache "Open_IM/pkg/common/db/rocks_cache"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/common/token_verify"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbAuth "Open_IM/pkg/proto/auth"
@ -58,7 +59,14 @@ func (s *organizationServer) Run() {
log.NewInfo("", "listen network success, ", address, listener)
defer listener.Close()
//grpc server
srv := grpc.NewServer()
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()
promePkg.NewGrpcRequestSuccessCounter()
grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme))
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
//Service registers with etcd
rpc.RegisterOrganizationServer(srv, s)

@ -8,6 +8,7 @@ import (
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
rocksCache "Open_IM/pkg/common/db/rocks_cache"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/common/token_verify"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbConversation "Open_IM/pkg/proto/conversation"
@ -61,7 +62,14 @@ func (s *userServer) Run() {
log.NewInfo("0", "listen network success, address ", address, listener)
defer listener.Close()
//grpc server
srv := grpc.NewServer()
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()
promePkg.NewGrpcRequestSuccessCounter()
grpcOpts = append(grpcOpts, grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme))
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
//Service registers with etcd
pbUser.RegisterUserServer(srv, s)

@ -4,8 +4,11 @@ import (
log "Open_IM/pkg/common/log"
"Open_IM/pkg/utils"
"errors"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
promePkg "Open_IM/pkg/common/prometheus"
)
type Producer struct {
@ -57,5 +60,8 @@ func (p *Producer) SendMessage(m proto.Message, key string, operationID string)
}
a, b, c := p.producer.SendMessage(kMsg)
log.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg.Key.Length(), kMsg.Value.Length(), p.producer)
if c == nil {
promePkg.PromeInc(promePkg.SendMsgCounter)
}
return a, b, utils.Wrap(c, "")
}

@ -6,6 +6,7 @@ import (
)
var (
//auth rpc
UserLoginCounter prometheus.Counter
UserRegisterCounter prometheus.Counter
@ -46,6 +47,18 @@ var (
MsgOnlinePushSuccessCounter prometheus.Counter
MsgOfflinePushSuccessCounter prometheus.Counter
MsgOfflinePushFailedCounter prometheus.Counter
// api
ApiRequestCounter prometheus.Counter
ApiRequestSuccessCounter prometheus.Counter
ApiRequestFailedCounter prometheus.Counter
// grpc
GrpcRequestCounter prometheus.Counter
GrpcRequestSuccessCounter prometheus.Counter
GrpcRequestFailedCounter prometheus.Counter
SendMsgCounter prometheus.Counter
)
func NewUserLoginCounter() {
@ -87,6 +100,55 @@ func NewSeqSetFailedCounter() {
})
}
func NewApiRequestCounter() {
ApiRequestCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "api_request",
Help: "The number of api request",
})
}
func NewApiRequestSuccessCounter() {
ApiRequestSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "api_request_success",
Help: "The number of api request success",
})
}
func NewApiRequestFailedCounter() {
ApiRequestFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "api_request_failed",
Help: "The number of api request failed",
})
}
func NewGrpcRequestCounter() {
GrpcRequestCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "grpc_request",
Help: "The number of api request",
})
}
func NewGrpcRequestSuccessCounter() {
GrpcRequestSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "grpc_request_success",
Help: "The number of grpc request success",
})
}
func NewGrpcRequestFailedCounter() {
GrpcRequestFailedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "grpc_request_failed",
Help: "The number of grpc request failed",
})
}
func NewSendMsgCount() {
SendMsgCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "send_msg",
Help: "The number of send msg",
})
}
func NewMsgInsertRedisSuccessCounter() {
MsgInsertRedisSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "msg_insert_redis_success",

@ -0,0 +1,35 @@
package prometheus
import (
"context"
"encoding/json"
"time"
"Open_IM/pkg/common/log"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
)
func UnaryServerInterceptorProme(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
remote, _ := peer.FromContext(ctx)
remoteAddr := remote.Addr.String()
in, _ := json.Marshal(req)
inStr := string(in)
log.NewInfo("ip", remoteAddr, "access_start", info.FullMethod, "in", inStr)
start := time.Now()
defer func() {
out, _ := json.Marshal(resp)
outStr := string(out)
duration := int64(time.Since(start) / time.Millisecond)
if duration >= 500 {
log.NewInfo("ip", remoteAddr, "access_end", info.FullMethod, "in", inStr, "out", outStr, "err", err, "duration/ms", duration)
} else {
log.NewInfo("ip", remoteAddr, "access_end", info.FullMethod, "in", inStr, "out", outStr, "err", err, "duration/ms", duration)
}
}()
resp, err = handler(ctx, req)
return
}

@ -2,6 +2,7 @@ package prometheus
import (
"Open_IM/pkg/common/config"
"bytes"
"net/http"
"strconv"
@ -26,6 +27,28 @@ func PrometheusHandler() gin.HandlerFunc {
}
}
type responseBodyWriter struct {
gin.ResponseWriter
body *bytes.Buffer
}
func (r responseBodyWriter) Write(b []byte) (int, error) {
r.body.Write(b)
return r.ResponseWriter.Write(b)
}
func PromeTheusMiddleware(c *gin.Context) {
PromeInc(ApiRequestCounter)
w := &responseBodyWriter{body: &bytes.Buffer{}, ResponseWriter: c.Writer}
c.Writer = w
c.Next()
if c.Writer.Status() == http.StatusOK {
PromeInc(ApiRequestSuccessCounter)
} else {
PromeInc(ApiRequestFailedCounter)
}
}
func PromeInc(counter prometheus.Counter) {
if config.Config.Prometheus.Enable {
if counter != nil {

Loading…
Cancel
Save