From e2cacd9c4b1161994766157baabcd3f93264eed2 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Mon, 9 May 2022 18:23:06 +0800 Subject: [PATCH] test --- go.sum | 2 ++ internal/msg_gateway/gate/init.go | 19 +++++++++++-------- internal/msg_gateway/gate/ws_server.go | 8 ++++---- .../msg_transfer/logic/history_msg_handler.go | 18 ++++++++++-------- internal/push/logic/init.go | 4 ++-- internal/push/logic/push_to_client.go | 2 +- pkg/common/constant/constant.go | 2 ++ pkg/statistics/statistics.go | 10 +++++----- 8 files changed, 37 insertions(+), 28 deletions(-) diff --git a/go.sum b/go.sum index 48d4ea780..f08695e8f 100644 --- a/go.sum +++ b/go.sum @@ -478,6 +478,8 @@ google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.42.0 h1:XT2/MFpuPFsEX2fWh3YQtHkZ+WYZFQRfaUgLZYj/p6A= google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= +google.golang.org/grpc v1.45.0 h1:NEpgUqV3Z+ZjkqMsxMg11IaDrXY4RY6CQukSGK0uI1M= +google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/internal/msg_gateway/gate/init.go b/internal/msg_gateway/gate/init.go index 8f2f2e35b..5cf682b84 100644 --- a/internal/msg_gateway/gate/init.go +++ b/internal/msg_gateway/gate/init.go @@ -2,6 +2,7 @@ package gate import ( "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" "Open_IM/pkg/statistics" "fmt" @@ -10,12 +11,14 @@ import ( ) var ( - rwLock *sync.RWMutex - validate *validator.Validate - ws WServer - rpcSvr RPCServer - sendMsgCount uint64 - userCount uint64 + rwLock *sync.RWMutex + validate *validator.Validate + ws WServer + rpcSvr RPCServer + sendMsgAllCount uint64 + sendMsgFailedCount uint64 + sendMsgSuccessCount uint64 + userCount uint64 ) func Init(rpcPort, wsPort int) { @@ -23,8 +26,8 @@ func Init(rpcPort, wsPort int) { rwLock = new(sync.RWMutex) validate = validator.New() - statistics.NewStatistics(&sendMsgCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second recv to msg_gateway sendMsgCount", sendMsgCount), 300) - statistics.NewStatistics(&userCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second add user conn", userCount), 300) + statistics.NewStatistics(&sendMsgAllCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second recv to msg_gateway sendMsgCount", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) + statistics.NewStatistics(&userCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second add user conn", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) ws.onInit(wsPort) rpcSvr.onInit(rpcPort) } diff --git a/internal/msg_gateway/gate/ws_server.go b/internal/msg_gateway/gate/ws_server.go index e6f3a317e..4ce4e8648 100644 --- a/internal/msg_gateway/gate/ws_server.go +++ b/internal/msg_gateway/gate/ws_server.go @@ -62,6 +62,7 @@ func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) { //Initialize a lock for each user newConn := &UserConn{conn, new(sync.Mutex)} + userCount++ ws.addUserConn(query["sendID"][0], int32(utils.StringToInt64(query["platformID"][0])), newConn, query["token"][0]) go ws.readMsg(newConn) } @@ -77,6 +78,7 @@ func (ws *WServer) readMsg(conn *UserConn) { if err != nil { uid, platform := ws.getUserUid(conn) log.ErrorByKv("WS ReadMsg error", "", "userIP", conn.RemoteAddr().String(), "userUid", uid, "platform", platform, "error", err.Error()) + userCount-- ws.delUserConn(conn) return } else { @@ -189,7 +191,6 @@ func (ws *WServer) addUserConn(uid string, platformID int32, conn *UserConn, tok count = count + len(v) } log.Debug(operationID, "WS Add operation", "", "wsUser added", ws.wsUserToConn, "connection_uid", uid, "connection_platform", constant.PlatformIDToName(platformID), "online_user_num", len(ws.wsUserToConn), "online_conn_num", count) - userCount = uint64(len(ws.wsUserToConn)) } @@ -213,11 +214,10 @@ func (ws *WServer) delUserConn(conn *UserConn) { for _, v := range ws.wsUserToConn { count = count + len(v) } - log.NewWarn(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn), "online_conn_num", count) + log.Debug(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn), "online_conn_num", count) } else { - log.NewWarn(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn)) + log.Debug(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn)) } - userCount = uint64(len(ws.wsUserToConn)) delete(ws.wsConnToUser, conn) } diff --git a/internal/msg_transfer/logic/history_msg_handler.go b/internal/msg_transfer/logic/history_msg_handler.go index f7acea5d6..91179ca0e 100644 --- a/internal/msg_transfer/logic/history_msg_handler.go +++ b/internal/msg_transfer/logic/history_msg_handler.go @@ -11,6 +11,7 @@ import ( "Open_IM/pkg/statistics" "Open_IM/pkg/utils" "context" + "fmt" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" "strings" @@ -20,15 +21,16 @@ import ( type fcb func(msg []byte, msgKey string) type HistoryConsumerHandler struct { - msgHandle map[string]fcb - historyConsumerGroup *kfk.MConsumerGroup - singleMsgCount uint64 - groupMsgCount uint64 + msgHandle map[string]fcb + historyConsumerGroup *kfk.MConsumerGroup + singleMsgFailedCount uint64 + singleMsgSuccessCount uint64 + groupMsgCount uint64 } func (mc *HistoryConsumerHandler) Init() { - statistics.NewStatistics(&mc.singleMsgCount, config.Config.ModuleName.MsgTransferName, "singleMsgCount insert to mongo ", 300) - statistics.NewStatistics(&mc.groupMsgCount, config.Config.ModuleName.MsgTransferName, "groupMsgCount insert to mongo ", 300) + statistics.NewStatistics(&mc.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) + statistics.NewStatistics(&mc.groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) mc.msgHandle = make(map[string]fcb) mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo @@ -59,10 +61,11 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) if isHistory { err := saveUserChat(msgKey, &msgFromMQ) if err != nil { + mc.singleMsgFailedCount++ log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) return } - mc.singleMsgCount++ + mc.singleMsgSuccessCount++ log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) } if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { @@ -89,7 +92,6 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) return } - mc.singleMsgCount++ log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) } if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { diff --git a/internal/push/logic/init.go b/internal/push/logic/init.go index d2d127091..75902e7f8 100644 --- a/internal/push/logic/init.go +++ b/internal/push/logic/init.go @@ -19,7 +19,7 @@ var ( pushCh PushConsumerHandler pushTerminal []int32 producer *kafka.Producer - count uint64 + successCount uint64 ) func Init(rpcPort int) { @@ -30,7 +30,7 @@ func Init(rpcPort int) { } func init() { producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) - statistics.NewStatistics(&count, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", 300), 300) + statistics.NewStatistics(&successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) } func Run() { diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 27682ba5b..b8125be5a 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -53,7 +53,7 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { } } log.NewInfo(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData) - count++ + successCount++ if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID { for _, v := range wsResult { if v.ResultCode == 0 { diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index 2e624dd5e..8a95898cd 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -267,3 +267,5 @@ func GroupIsBanPrivateChat(status int32) bool { const BigVersion = "v3" const LogFileName = "OpenIM.log" + +const StatisticsTimeInterval = 300 diff --git a/pkg/statistics/statistics.go b/pkg/statistics/statistics.go index bea4479bd..633eafc80 100644 --- a/pkg/statistics/statistics.go +++ b/pkg/statistics/statistics.go @@ -6,7 +6,7 @@ import ( ) type Statistics struct { - Count *uint64 + AllCount *uint64 ModuleName string PrintArgs string SleepTime int @@ -17,16 +17,16 @@ func (s *Statistics) output() { defer t.Stop() var sum uint64 for { - sum = *s.Count + sum = *s.AllCount select { case <-t.C: } - log.NewWarn("", " system stat ", s.ModuleName, s.PrintArgs, *s.Count-sum, "total:", *s.Count) + log.NewWarn("", " system stat ", s.ModuleName, s.PrintArgs, *s.AllCount-sum, "total:", *s.AllCount) } } -func NewStatistics(count *uint64, moduleName, printArgs string, sleepTime int) *Statistics { - p := &Statistics{Count: count, ModuleName: moduleName, SleepTime: sleepTime, PrintArgs: printArgs} +func NewStatistics(allCount *uint64, moduleName, printArgs string, sleepTime int) *Statistics { + p := &Statistics{AllCount: allCount, ModuleName: moduleName, SleepTime: sleepTime, PrintArgs: printArgs} go p.output() return p }