From 38a880210769d0423a81ff28757be151bcbe69c0 Mon Sep 17 00:00:00 2001 From: icey-yu <119291641+icey-yu@users.noreply.github.com> Date: Tue, 3 Sep 2024 18:32:40 +0800 Subject: [PATCH] feat: add long time push msg in prometheus (#2584) * feat: add long time push msg in prometheus * fix: log print * fix: go mod * fix: log msg * fix: log init * feat: push msg * feat: go mod ,remove cgo package * feat: remove error log * feat: test dummy push * feat:redis pool config * feat: push to kafka log --- config/redis.yml | 1 + go.mod | 2 +- go.sum | 4 ++-- internal/msgtransfer/online_history_msg_handler.go | 7 ++++++- internal/push/offlinepush/dummy/push.go | 2 ++ internal/push/push_handler.go | 3 ++- pkg/common/cmd/root.go | 3 ++- pkg/common/config/config.go | 4 +++- pkg/common/prommetrics/grpc_push.go | 4 ++++ pkg/common/prommetrics/rpc.go | 12 ++++++++++-- pkg/common/storage/cache/redis/batch_handler.go | 2 +- pkg/rpccache/conversation.go | 2 +- 12 files changed, 35 insertions(+), 11 deletions(-) diff --git a/config/redis.yml b/config/redis.yml index 83e305459..2448bcb5c 100644 --- a/config/redis.yml +++ b/config/redis.yml @@ -4,3 +4,4 @@ password: openIM123 clusterMode: false db: 0 maxRetry: 10 +poolSize: 100 diff --git a/go.mod b/go.mod index ae30db056..9723208e4 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.72-alpha.9 - github.com/openimsdk/tools v0.0.49-alpha.55 + github.com/openimsdk/tools v0.0.50-alpha.11 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 815b6badf..1a5f77999 100644 --- a/go.sum +++ b/go.sum @@ -321,8 +321,8 @@ github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCF github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.72-alpha.9 h1:Dyx4vs88IU4rJ2YcP/TdYp4ww8JjsMkV89hB/Eazx+A= github.com/openimsdk/protocol v0.0.72-alpha.9/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k= -github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= +github.com/openimsdk/tools v0.0.50-alpha.11 h1:ClhkRjUVJWbmOiQ14G6do/ES1a6ZueDITv40Apwq/Tc= +github.com/openimsdk/tools v0.0.50-alpha.11/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 8c1978d48..6de07cfbc 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -237,6 +237,10 @@ func (och *OnlineHistoryRedisConsumerHandler) categorizeMessageLists(totalMsgs [ } func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*ContextMsg) { + for _, storageMsg := range storageList { + log.ZDebug(ctx, "handle storage msg", "msg", storageMsg.message.String()) + } + och.toPushTopic(ctx, key, conversationID, notStorageList) var storageMessageList []*sdkws.MsgData for _, msg := range storageList { @@ -311,8 +315,9 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con } } -func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(_ context.Context, key, conversationID string, msgs []*ContextMsg) { +func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) { for _, v := range msgs { + log.ZDebug(ctx, "push msg to topic", "msg", v.message.String()) och.msgDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message) } } diff --git a/internal/push/offlinepush/dummy/push.go b/internal/push/offlinepush/dummy/push.go index 028e7edd3..5698b7294 100644 --- a/internal/push/offlinepush/dummy/push.go +++ b/internal/push/offlinepush/dummy/push.go @@ -17,6 +17,7 @@ package dummy import ( "context" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" + "github.com/openimsdk/tools/log" ) func NewClient() *Dummy { @@ -27,5 +28,6 @@ type Dummy struct { } func (d *Dummy) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error { + log.ZInfo(ctx, "dummy push") return nil } diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 79d3a9296..5d0359994 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -93,7 +93,8 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { nowSec := timeutil.GetCurrentTimestampBySecond() if nowSec-sec > 10 { - log.ZWarn(ctx, "long time push msg", nil, "msg", pbData.String(), "sec", sec, "nowSec", nowSec, "nowSec-sec", nowSec-sec) + prommetrics.MsgLoneTimePushCounter.Inc() + log.ZWarn(ctx, "it’s been a while since the message was sent", nil, "msg", pbData.String(), "sec", sec, "nowSec", nowSec, "nowSec-sec", nowSec-sec) } var err error switch msgFromMQ.MsgData.SessionType { diff --git a/pkg/common/cmd/root.go b/pkg/common/cmd/root.go index b43f86557..5edea4377 100644 --- a/pkg/common/cmd/root.go +++ b/pkg/common/cmd/root.go @@ -129,10 +129,11 @@ func (r *RootCmd) applyOptions(opts ...func(*CmdOpts)) *CmdOpts { } func (r *RootCmd) initializeLogger(cmdOpts *CmdOpts) error { - err := log.InitFromConfig( + err := log.InitLoggerFromConfig( cmdOpts.loggerPrefixName, r.processName, + "", "", r.log.RemainLogLevel, r.log.IsStdout, r.log.IsJson, diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 5261e034c..8bd16178d 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -336,7 +336,8 @@ type Redis struct { Password string `mapstructure:"password"` ClusterMode bool `mapstructure:"clusterMode"` DB int `mapstructure:"storage"` - MaxRetry int `mapstructure:"MaxRetry"` + MaxRetry int `mapstructure:"maxRetry"` + PoolSize int `mapstructure:"poolSize"` } type BeforeConfig struct { @@ -474,6 +475,7 @@ func (r *Redis) Build() *redisutil.Config { Password: r.Password, DB: r.DB, MaxRetry: r.MaxRetry, + PoolSize: r.PoolSize, } } diff --git a/pkg/common/prommetrics/grpc_push.go b/pkg/common/prommetrics/grpc_push.go index 0b6c3e76f..5c966310f 100644 --- a/pkg/common/prommetrics/grpc_push.go +++ b/pkg/common/prommetrics/grpc_push.go @@ -23,4 +23,8 @@ var ( Name: "msg_offline_push_failed_total", Help: "The number of msg failed offline pushed", }) + MsgLoneTimePushCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "msg_long_time_push_total", + Help: "The number of messages with a push time exceeding 10 seconds", + }) ) diff --git a/pkg/common/prommetrics/rpc.go b/pkg/common/prommetrics/rpc.go index dc16322da..7162fa7e8 100644 --- a/pkg/common/prommetrics/rpc.go +++ b/pkg/common/prommetrics/rpc.go @@ -47,9 +47,17 @@ func GetGrpcCusMetrics(registerName string, share *config.Share) []prometheus.Co case share.RpcRegisterName.MessageGateway: return []prometheus.Collector{OnlineUserGauge} case share.RpcRegisterName.Msg: - return []prometheus.Collector{SingleChatMsgProcessSuccessCounter, SingleChatMsgProcessFailedCounter, GroupChatMsgProcessSuccessCounter, GroupChatMsgProcessFailedCounter} + return []prometheus.Collector{ + SingleChatMsgProcessSuccessCounter, + SingleChatMsgProcessFailedCounter, + GroupChatMsgProcessSuccessCounter, + GroupChatMsgProcessFailedCounter, + } case share.RpcRegisterName.Push: - return []prometheus.Collector{MsgOfflinePushFailedCounter} + return []prometheus.Collector{ + MsgOfflinePushFailedCounter, + MsgLoneTimePushCounter, + } case share.RpcRegisterName.Auth: return []prometheus.Collector{UserLoginCounter} case share.RpcRegisterName.User: diff --git a/pkg/common/storage/cache/redis/batch_handler.go b/pkg/common/storage/cache/redis/batch_handler.go index f9923e198..1fbd664a3 100644 --- a/pkg/common/storage/cache/redis/batch_handler.go +++ b/pkg/common/storage/cache/redis/batch_handler.go @@ -118,7 +118,7 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) { t, err = fn(ctx) if err != nil { - log.ZError(ctx, "getCache query database failed", err, "key", key) + //log.ZError(ctx, "getCache query database failed", err, "key", key) return "", err } bs, err := json.Marshal(t) diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go index 2a62c7bbd..925d2a37c 100644 --- a/pkg/rpccache/conversation.go +++ b/pkg/rpccache/conversation.go @@ -86,7 +86,7 @@ func (c *ConversationLocalCache) GetConversation(ctx context.Context, userID, co if err == nil { log.ZDebug(ctx, "ConversationLocalCache GetConversation return", "userID", userID, "conversationID", conversationID, "value", val) } else { - log.ZError(ctx, "ConversationLocalCache GetConversation return", err, "userID", userID, "conversationID", conversationID) + log.ZWarn(ctx, "ConversationLocalCache GetConversation return", err, "userID", userID, "conversationID", conversationID) } }() var cache cacheProto[pbconversation.Conversation]