add batch push

pull/236/head
Gordon 2 years ago
parent ed1e22bac5
commit 1d4dc81e55

@ -7,6 +7,7 @@ import (
"Open_IM/pkg/common/token_verify"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbRelay "Open_IM/pkg/proto/relay"
sdk_ws "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"bytes"
"context"
@ -143,30 +144,78 @@ func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUser
return &resp, nil
}
//func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.OnlineBatchPushOneMsgReq) (*pbRelay.OnlineBatchPushOneMsgResp, error) {
// log.NewInfo(req.OperationID, "BatchPushMsgToUser is arriving", req.String())
// var singleUserResult []*pbRelay.SingelMsgToUserResultList
// //r.GetBatchMsgForPush(req.OperationID,req.MsgData,req.PushToUserIDList,)
// msgBytes, _ := proto.Marshal(req.MsgData)
// mReply := Resp{
// ReqIdentifier: constant.WSPushMsg,
// OperationID: req.OperationID,
// Data: msgBytes,
// }
// var replyBytes bytes.Buffer
// enc := gob.NewEncoder(&replyBytes)
// err := enc.Encode(mReply)
// if err != nil {
// log.NewError(req.OperationID, "data encode err", err.Error())
// }
// for _, v := range req.PushToUserIDList {
// var resp []*pbRelay.SingleMsgToUserPlatform
// tempT := &pbRelay.SingelMsgToUserResultList{
// UserID: v,
// }
// userConnMap := ws.getUserAllCons(v)
// for platform, userConn := range userConnMap {
// if userConn != nil {
// resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v)
// if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) {
// tempT.OnlinePush = true
// log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v)
// temp := &pbRelay.SingleMsgToUserPlatform{
// ResultCode: resultCode,
// RecvID: v,
// RecvPlatFormID: int32(platform),
// }
// resp = append(resp, temp)
// }
//
// }
// }
// tempT.Resp = resp
// singleUserResult = append(singleUserResult, tempT)
//
// }
//
// return &pbRelay.OnlineBatchPushOneMsgResp{
// SinglePushResult: singleUserResult,
// }, nil
//}
func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.OnlineBatchPushOneMsgReq) (*pbRelay.OnlineBatchPushOneMsgResp, error) {
log.NewInfo(req.OperationID, "BatchPushMsgToUser is arriving", req.String())
var singleUserResult []*pbRelay.SingelMsgToUserResultList
msgBytes, _ := proto.Marshal(req.MsgData)
mReply := Resp{
ReqIdentifier: constant.WSPushMsg,
OperationID: req.OperationID,
Data: msgBytes,
}
var replyBytes bytes.Buffer
enc := gob.NewEncoder(&replyBytes)
err := enc.Encode(mReply)
if err != nil {
log.NewError(req.OperationID, "data encode err", err.Error())
}
for _, v := range req.PushToUserIDList {
var resp []*pbRelay.SingleMsgToUserPlatform
tempT := &pbRelay.SingelMsgToUserResultList{
UserID: v,
}
userConnMap := ws.getUserAllCons(v)
for platform, userConn := range userConnMap {
if userConn != nil {
resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v)
var platformList []int
for k, _ := range userConnMap {
platformList = append(platformList, k)
}
needPushMapList := r.GetSingleUserMsgForPushPlatforms(req.OperationID, req.MsgData, v, platformList)
for platform, list := range needPushMapList {
if list != nil {
for _, v := range list {
req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v)
}
replyBytes, err := r.encodeWsData(req.MsgData, req.OperationID)
if err != nil {
continue
}
resultCode := sendMsgBatchToUser(userConnMap[platform], replyBytes.Bytes(), req, platform, v)
if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) {
tempT.OnlinePush = true
log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v)
@ -178,17 +227,42 @@ func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.Online
resp = append(resp, temp)
}
} else {
if utils.IsContainInt(platform, r.pushTerminal) {
tempT.OnlinePush = true
temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: 0,
RecvID: v,
RecvPlatFormID: int32(platform),
}
resp = append(resp, temp)
}
}
}
tempT.Resp = resp
singleUserResult = append(singleUserResult, tempT)
}
return &pbRelay.OnlineBatchPushOneMsgResp{
SinglePushResult: singleUserResult,
}, nil
}
func (r *RPCServer) encodeWsData(wsData *sdk_ws.MsgData, operationID string) (bytes.Buffer, error) {
msgBytes, _ := proto.Marshal(wsData)
mReply := Resp{
ReqIdentifier: constant.WSPushMsg,
OperationID: operationID,
Data: msgBytes,
}
var replyBytes bytes.Buffer
enc := gob.NewEncoder(&replyBytes)
err := enc.Encode(mReply)
if err != nil {
log.NewError(operationID, "data encode err", err.Error())
return bytes.Buffer{}, err
}
return replyBytes, nil
}
func (r *RPCServer) KickUserOffline(_ context.Context, req *pbRelay.KickUserOfflineReq) (*pbRelay.KickUserOfflineResp, error) {
log.NewInfo(req.OperationID, "KickUserOffline is arriving", req.String())
for _, v := range req.KickUserIDList {

Loading…
Cancel
Save