@ -23,6 +23,7 @@ import (
"sync"
"time"
promePkg "Open_IM/pkg/common/prometheus"
go_redis "github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto"
)
@ -270,6 +271,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
}
switch pb . MsgData . SessionType {
case constant . SingleChatType :
promePkg . PromeInc ( promePkg . SingleChatMsgRecvSuccessCounter )
// callback
t1 = time . Now ( )
callbackResp := callbackBeforeSendSingleMsg ( pb )
@ -282,6 +284,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
callbackResp . ErrCode = 201
}
log . NewDebug ( pb . OperationID , utils . GetSelfFuncName ( ) , "callbackBeforeSendSingleMsg result" , "end rpc and return" , callbackResp )
promePkg . PromeInc ( promePkg . SingleChatMsgProcessFailedCounter )
return returnMsg ( & replay , pb , int32 ( callbackResp . ErrCode ) , callbackResp . ErrMsg , "" , 0 )
}
t1 = time . Now ( )
@ -295,6 +298,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
log . Info ( pb . OperationID , "sendMsgToKafka " , " cost time: " , time . Since ( t1 ) )
if err1 != nil {
log . NewError ( msgToMQSingle . OperationID , "kafka send msg err :RecvID" , msgToMQSingle . MsgData . RecvID , msgToMQSingle . String ( ) , err1 . Error ( ) )
promePkg . PromeInc ( promePkg . SingleChatMsgProcessFailedCounter )
return returnMsg ( & replay , pb , 201 , "kafka send msg err" , "" , 0 )
}
}
@ -304,6 +308,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
log . Info ( pb . OperationID , "sendMsgToKafka " , " cost time: " , time . Since ( t1 ) )
if err2 != nil {
log . NewError ( msgToMQSingle . OperationID , "kafka send msg err:SendID" , msgToMQSingle . MsgData . SendID , msgToMQSingle . String ( ) )
promePkg . PromeInc ( promePkg . SingleChatMsgProcessFailedCounter )
return returnMsg ( & replay , pb , 201 , "kafka send msg err" , "" , 0 )
}
}
@ -315,9 +320,11 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
log . NewError ( pb . OperationID , utils . GetSelfFuncName ( ) , "callbackAfterSendSingleMsg resp: " , callbackResp )
}
log . Debug ( pb . OperationID , "send msg cost time all: " , db . GetCurrentTimestampByMill ( ) - newTime , pb . MsgData . ClientMsgID )
promePkg . PromeInc ( promePkg . SingleChatMsgProcessSuccessCounter )
return returnMsg ( & replay , pb , 0 , "" , msgToMQSingle . MsgData . ServerMsgID , msgToMQSingle . MsgData . SendTime )
case constant . GroupChatType :
// callback
promePkg . PromeInc ( promePkg . GroupChatMsgRecvSuccessCounter )
callbackResp := callbackBeforeSendGroupMsg ( pb )
if callbackResp . ErrCode != 0 {
log . NewError ( pb . OperationID , utils . GetSelfFuncName ( ) , "callbackBeforeSendGroupMsg resp:" , callbackResp )
@ -327,10 +334,12 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
callbackResp . ErrCode = 201
}
log . NewDebug ( pb . OperationID , utils . GetSelfFuncName ( ) , "callbackBeforeSendSingleMsg result" , "end rpc and return" , callbackResp )
promePkg . PromeInc ( promePkg . GroupChatMsgProcessFailedCounter )
return returnMsg ( & replay , pb , int32 ( callbackResp . ErrCode ) , callbackResp . ErrMsg , "" , 0 )
}
var memberUserIDList [ ] string
if flag , errCode , errMsg , memberUserIDList = messageVerification ( pb ) ; ! flag {
promePkg . PromeInc ( promePkg . GroupChatMsgProcessFailedCounter )
return returnMsg ( & replay , pb , errCode , errMsg , "" , 0 )
}
log . Debug ( pb . OperationID , "GetGroupAllMember userID list" , memberUserIDList , "len: " , len ( memberUserIDList ) )
@ -395,6 +404,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
}
if ! sendTag {
log . NewWarn ( pb . OperationID , "send tag is " , sendTag )
promePkg . PromeInc ( promePkg . GroupChatMsgProcessFailedCounter )
return returnMsg ( & replay , pb , 201 , "kafka send msg err" , "" , 0 )
} else {
if pb . MsgData . ContentType == constant . AtText {
@ -459,6 +469,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
} ( )
}
log . Debug ( pb . OperationID , "send msg cost time3 " , db . GetCurrentTimestampByMill ( ) - newTime , pb . MsgData . ClientMsgID )
promePkg . PromeInc ( promePkg . GroupChatMsgProcessSuccessCounter )
return returnMsg ( & replay , pb , 0 , "" , msgToMQSingle . MsgData . ServerMsgID , msgToMQSingle . MsgData . SendTime )
}
case constant . NotificationChatType :
@ -481,6 +492,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
log . Debug ( pb . OperationID , "send msg cost time " , db . GetCurrentTimestampByMill ( ) - newTime , pb . MsgData . ClientMsgID )
return returnMsg ( & replay , pb , 0 , "" , msgToMQSingle . MsgData . ServerMsgID , msgToMQSingle . MsgData . SendTime )
case constant . SuperGroupChatType :
promePkg . PromeInc ( promePkg . WorkSuperGroupChatMsgRecvSuccessCounter )
// callback
callbackResp := callbackBeforeSendGroupMsg ( pb )
if callbackResp . ErrCode != 0 {
@ -490,10 +502,12 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
if callbackResp . ErrCode == 0 {
callbackResp . ErrCode = 201
}
promePkg . PromeInc ( promePkg . WorkSuperGroupChatMsgProcessFailedCounter )
log . NewDebug ( pb . OperationID , utils . GetSelfFuncName ( ) , "callbackBeforeSendSuperGroupMsg result" , "end rpc and return" , callbackResp )
return returnMsg ( & replay , pb , int32 ( callbackResp . ErrCode ) , callbackResp . ErrMsg , "" , 0 )
}
if flag , errCode , errMsg , _ = messageVerification ( pb ) ; ! flag {
promePkg . PromeInc ( promePkg . WorkSuperGroupChatMsgProcessFailedCounter )
return returnMsg ( & replay , pb , errCode , errMsg , "" , 0 )
}
msgToMQSingle . MsgData = pb . MsgData
@ -501,6 +515,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
err1 := rpc . sendMsgToKafka ( & msgToMQSingle , msgToMQSingle . MsgData . GroupID , constant . OnlineStatus )
if err1 != nil {
log . NewError ( msgToMQSingle . OperationID , "kafka send msg err:RecvID" , msgToMQSingle . MsgData . RecvID , msgToMQSingle . String ( ) )
promePkg . PromeInc ( promePkg . WorkSuperGroupChatMsgProcessFailedCounter )
return returnMsg ( & replay , pb , 201 , "kafka send msg err" , "" , 0 )
}
// callback
@ -508,6 +523,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
if callbackResp . ErrCode != 0 {
log . NewError ( pb . OperationID , utils . GetSelfFuncName ( ) , "callbackAfterSendSuperGroupMsg resp: " , callbackResp )
}
promePkg . PromeInc ( promePkg . WorkSuperGroupChatMsgProcessSuccessCounter )
return returnMsg ( & replay , pb , 0 , "" , msgToMQSingle . MsgData . ServerMsgID , msgToMQSingle . MsgData . SendTime )
default :