send message

pull/232/head v2.0.10
Gordon 3 years ago
parent 91fe549c25
commit 381e72f1ef

@ -39,15 +39,13 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) {
} }
func (och *OnlineHistoryConsumerHandler) TriggerCmd(status int) { func (och *OnlineHistoryConsumerHandler) TriggerCmd(status int) {
operationID := utils.OperationIDGenerator() operationID := utils.OperationIDGenerator()
for {
err := sendCmd(och.cmdCh, Cmd2Value{Cmd: status, Value: ""}, 1) err := sendCmd(och.cmdCh, Cmd2Value{Cmd: status, Value: ""}, 1)
if err != nil { if err != nil {
log.Error(operationID, "TriggerCmd failed ", err.Error(), status) log.Error(operationID, "TriggerCmd failed ", err.Error(), status)
continue
}
log.Debug(operationID, "TriggerCmd success", status)
return return
} }
log.Debug(operationID, "TriggerCmd success", status)
} }
func sendCmd(ch chan Cmd2Value, value Cmd2Value, timeout int64) error { func sendCmd(ch chan Cmd2Value, value Cmd2Value, timeout int64) error {
var flag = 0 var flag = 0

@ -24,6 +24,9 @@ import (
"time" "time"
) )
//When the number of group members is greater than this valueOnline users will be sent firstGuaranteed service availability
const GroupMemberNum = 500
type MsgCallBackReq struct { type MsgCallBackReq struct {
SendID string `json:"sendID"` SendID string `json:"sendID"`
RecvID string `json:"recvID"` RecvID string `json:"recvID"`
@ -254,34 +257,32 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
default: default:
} }
onUserIDList, offUserIDList := getOnlineAndOfflineUserIDList(memberUserIDList, pb.OperationID) m := make(map[string][]string, 2)
log.Debug(pb.OperationID, onUserIDList, offUserIDList) if len(memberUserIDList) > GroupMemberNum {
getOnlineAndOfflineUserIDList(memberUserIDList, m, pb.OperationID)
log.Debug(pb.OperationID, m[constant.OnlineStatus], m[constant.OfflineStatus])
} else {
m[constant.OnlineStatus] = memberUserIDList
}
//split parallel send //split parallel send
var wg sync.WaitGroup var wg sync.WaitGroup
var sendTag bool var sendTag bool
var split = 50 var split = 50
remain := len(onUserIDList) % split for k, v := range m {
for i := 0; i < len(onUserIDList)/split; i++ { remain := len(v) % split
for i := 0; i < len(v)/split; i++ {
wg.Add(1) wg.Add(1)
go rpc.sendMsgToGroup(onUserIDList[i*split:(i+1)*split], *pb, constant.OnlineStatus, &sendTag, &wg) go rpc.sendMsgToGroup(v[i*split:(i+1)*split], *pb, k, &sendTag, &wg)
} }
if remain > 0 { if remain > 0 {
wg.Add(1) wg.Add(1)
go rpc.sendMsgToGroup(onUserIDList[split*(len(onUserIDList)/split):], *pb, constant.OnlineStatus, &sendTag, &wg) go rpc.sendMsgToGroup(v[split*(len(v)/split):], *pb, k, &sendTag, &wg)
} }
wg.Wait()
remain = len(offUserIDList) % split
for i := 0; i < len(offUserIDList)/split; i++ {
wg.Add(1)
go rpc.sendMsgToGroup(offUserIDList[i*split:(i+1)*split], *pb, constant.OfflineStatus, &sendTag, &wg)
} }
if remain > 0 {
wg.Add(1) wg.Add(1)
go rpc.sendMsgToGroup(offUserIDList[split*(len(offUserIDList)/split):], *pb, constant.OfflineStatus, &sendTag, &wg) go rpc.sendMsgToGroup(addUidList, *pb, constant.OnlineStatus, &sendTag, &wg)
}
wg.Wait() wg.Wait()
wg.Add(1)
rpc.sendMsgToGroup(addUidList, *pb, constant.OnlineStatus, &sendTag, &wg)
// callback // callback
if err := callbackAfterSendGroupMsg(pb); err != nil { if err := callbackAfterSendGroupMsg(pb); err != nil {
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendGroupMsg failed", err.Error()) log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendGroupMsg failed", err.Error())
@ -695,7 +696,8 @@ func Notification(n *NotificationMsg) {
log.NewError(req.OperationID, "SendMsg rpc failed, ", req.String(), reply.ErrCode, reply.ErrMsg) log.NewError(req.OperationID, "SendMsg rpc failed, ", req.String(), reply.ErrCode, reply.ErrMsg)
} }
} }
func getOnlineAndOfflineUserIDList(memberList []string, operationID string) (onllUserIDList []string, offlUserIDList []string) { func getOnlineAndOfflineUserIDList(memberList []string, m map[string][]string, operationID string) {
var onllUserIDList, offlUserIDList []string
var wsResult []*pbRelay.GetUsersOnlineStatusResp_SuccessResult var wsResult []*pbRelay.GetUsersOnlineStatusResp_SuccessResult
req := &pbRelay.GetUsersOnlineStatusReq{} req := &pbRelay.GetUsersOnlineStatusReq{}
req.UserIDList = memberList req.UserIDList = memberList
@ -731,7 +733,8 @@ func getOnlineAndOfflineUserIDList(memberList []string, operationID string) (onl
offlUserIDList = append(offlUserIDList, v1) offlUserIDList = append(offlUserIDList, v1)
} }
} }
return onllUserIDList, offlUserIDList m[constant.OnlineStatus] = onllUserIDList
m[constant.OfflineStatus] = offlUserIDList
} }
func (rpc *rpcChat) sendMsgToGroup(list []string, pb pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) { func (rpc *rpcChat) sendMsgToGroup(list []string, pb pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) {

Loading…
Cancel
Save