diff --git a/internal/rpc/office/office.go b/internal/rpc/office/office.go index 816c272c7..753fca492 100644 --- a/internal/rpc/office/office.go +++ b/internal/rpc/office/office.go @@ -33,7 +33,7 @@ type officeServer struct { func NewOfficeServer(port int) *officeServer { log.NewPrivateLog(constant.LogFileName) - ch := make(chan tagSendStruct, 10000) + ch := make(chan tagSendStruct, 100000) return &officeServer{ rpcPort: port, rpcRegisterName: config.Config.RpcRegisterName.OpenImOfficeName, @@ -106,7 +106,7 @@ func (s *officeServer) sendTagMsgRoutine() { select { case v := <-s.ch: msg.TagSendMessage(v.operationID, v.user, v.userID, v.content, v.senderPlatformID) - time.Sleep(time.Millisecond * 500) + time.Sleep(time.Millisecond * 100) } } } @@ -251,7 +251,7 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR resp.CommonResp.ErrCode = constant.ErrDB.ErrCode return resp, nil } - + var successUserIDList []string for _, userID := range userIDList { t := tagSendStruct{ operationID: req.OperationID, @@ -263,6 +263,7 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR select { case s.ch <- t: log.NewDebug(t.operationID, utils.GetSelfFuncName(), "msg: ", t, "send success") + successUserIDList = append(successUserIDList, userID) // if channel is full, return grpc req case <-time.After(1 * time.Second): log.NewError(t.operationID, utils.GetSelfFuncName(), s.ch, "channel is full") @@ -274,9 +275,9 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR var tagSendLogs db.TagSendLog var wg sync.WaitGroup - wg.Add(len(userIDList)) + wg.Add(len(successUserIDList)) var lock sync.Mutex - for _, userID := range userIDList { + for _, userID := range successUserIDList { go func(userID string) { defer wg.Done() userName, err := im_mysql_model.GetUserNameByUserID(userID)