pull/232/head
Gordon 3 years ago
parent d5c9e08888
commit e2cacd9c4b

@ -478,6 +478,8 @@ google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.42.0 h1:XT2/MFpuPFsEX2fWh3YQtHkZ+WYZFQRfaUgLZYj/p6A=
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
google.golang.org/grpc v1.45.0 h1:NEpgUqV3Z+ZjkqMsxMg11IaDrXY4RY6CQukSGK0uI1M=
google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

@ -2,6 +2,7 @@ package gate
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/statistics"
"fmt"
@ -14,7 +15,9 @@ var (
validate *validator.Validate
ws WServer
rpcSvr RPCServer
sendMsgCount uint64
sendMsgAllCount uint64
sendMsgFailedCount uint64
sendMsgSuccessCount uint64
userCount uint64
)
@ -23,8 +26,8 @@ func Init(rpcPort, wsPort int) {
rwLock = new(sync.RWMutex)
validate = validator.New()
statistics.NewStatistics(&sendMsgCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second recv to msg_gateway sendMsgCount", sendMsgCount), 300)
statistics.NewStatistics(&userCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second add user conn", userCount), 300)
statistics.NewStatistics(&sendMsgAllCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second recv to msg_gateway sendMsgCount", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
statistics.NewStatistics(&userCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second add user conn", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
ws.onInit(wsPort)
rpcSvr.onInit(rpcPort)
}

@ -62,6 +62,7 @@ func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) {
//Initialize a lock for each user
newConn := &UserConn{conn, new(sync.Mutex)}
userCount++
ws.addUserConn(query["sendID"][0], int32(utils.StringToInt64(query["platformID"][0])), newConn, query["token"][0])
go ws.readMsg(newConn)
}
@ -77,6 +78,7 @@ func (ws *WServer) readMsg(conn *UserConn) {
if err != nil {
uid, platform := ws.getUserUid(conn)
log.ErrorByKv("WS ReadMsg error", "", "userIP", conn.RemoteAddr().String(), "userUid", uid, "platform", platform, "error", err.Error())
userCount--
ws.delUserConn(conn)
return
} else {
@ -189,7 +191,6 @@ func (ws *WServer) addUserConn(uid string, platformID int32, conn *UserConn, tok
count = count + len(v)
}
log.Debug(operationID, "WS Add operation", "", "wsUser added", ws.wsUserToConn, "connection_uid", uid, "connection_platform", constant.PlatformIDToName(platformID), "online_user_num", len(ws.wsUserToConn), "online_conn_num", count)
userCount = uint64(len(ws.wsUserToConn))
}
@ -213,11 +214,10 @@ func (ws *WServer) delUserConn(conn *UserConn) {
for _, v := range ws.wsUserToConn {
count = count + len(v)
}
log.NewWarn(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn), "online_conn_num", count)
log.Debug(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn), "online_conn_num", count)
} else {
log.NewWarn(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn))
log.Debug(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn))
}
userCount = uint64(len(ws.wsUserToConn))
delete(ws.wsConnToUser, conn)
}

@ -11,6 +11,7 @@ import (
"Open_IM/pkg/statistics"
"Open_IM/pkg/utils"
"context"
"fmt"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
"strings"
@ -22,13 +23,14 @@ type fcb func(msg []byte, msgKey string)
type HistoryConsumerHandler struct {
msgHandle map[string]fcb
historyConsumerGroup *kfk.MConsumerGroup
singleMsgCount uint64
singleMsgFailedCount uint64
singleMsgSuccessCount uint64
groupMsgCount uint64
}
func (mc *HistoryConsumerHandler) Init() {
statistics.NewStatistics(&mc.singleMsgCount, config.Config.ModuleName.MsgTransferName, "singleMsgCount insert to mongo ", 300)
statistics.NewStatistics(&mc.groupMsgCount, config.Config.ModuleName.MsgTransferName, "groupMsgCount insert to mongo ", 300)
statistics.NewStatistics(&mc.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
statistics.NewStatistics(&mc.groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
mc.msgHandle = make(map[string]fcb)
mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo
@ -59,10 +61,11 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
if isHistory {
err := saveUserChat(msgKey, &msgFromMQ)
if err != nil {
mc.singleMsgFailedCount++
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
return
}
mc.singleMsgCount++
mc.singleMsgSuccessCount++
log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
}
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
@ -89,7 +92,6 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
return
}
mc.singleMsgCount++
log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
}
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {

@ -19,7 +19,7 @@ var (
pushCh PushConsumerHandler
pushTerminal []int32
producer *kafka.Producer
count uint64
successCount uint64
)
func Init(rpcPort int) {
@ -30,7 +30,7 @@ func Init(rpcPort int) {
}
func init() {
producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic)
statistics.NewStatistics(&count, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", 300), 300)
statistics.NewStatistics(&successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
}
func Run() {

@ -53,7 +53,7 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
}
}
log.NewInfo(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData)
count++
successCount++
if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID {
for _, v := range wsResult {
if v.ResultCode == 0 {

@ -267,3 +267,5 @@ func GroupIsBanPrivateChat(status int32) bool {
const BigVersion = "v3"
const LogFileName = "OpenIM.log"
const StatisticsTimeInterval = 300

@ -6,7 +6,7 @@ import (
)
type Statistics struct {
Count *uint64
AllCount *uint64
ModuleName string
PrintArgs string
SleepTime int
@ -17,16 +17,16 @@ func (s *Statistics) output() {
defer t.Stop()
var sum uint64
for {
sum = *s.Count
sum = *s.AllCount
select {
case <-t.C:
}
log.NewWarn("", " system stat ", s.ModuleName, s.PrintArgs, *s.Count-sum, "total:", *s.Count)
log.NewWarn("", " system stat ", s.ModuleName, s.PrintArgs, *s.AllCount-sum, "total:", *s.AllCount)
}
}
func NewStatistics(count *uint64, moduleName, printArgs string, sleepTime int) *Statistics {
p := &Statistics{Count: count, ModuleName: moduleName, SleepTime: sleepTime, PrintArgs: printArgs}
func NewStatistics(allCount *uint64, moduleName, printArgs string, sleepTime int) *Statistics {
p := &Statistics{AllCount: allCount, ModuleName: moduleName, SleepTime: sleepTime, PrintArgs: printArgs}
go p.output()
return p
}

Loading…
Cancel
Save