From 313cf858f065868991e8ae495453fcb016ddf629 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Wed, 18 May 2022 11:52:15 +0800 Subject: [PATCH 1/7] Statistics --- internal/msg_gateway/gate/init.go | 2 ++ internal/msg_gateway/gate/logic.go | 2 ++ pkg/common/constant/constant.go | 2 +- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/msg_gateway/gate/init.go b/internal/msg_gateway/gate/init.go index 5cf682b84..60c97f1de 100644 --- a/internal/msg_gateway/gate/init.go +++ b/internal/msg_gateway/gate/init.go @@ -19,6 +19,8 @@ var ( sendMsgFailedCount uint64 sendMsgSuccessCount uint64 userCount uint64 + + sendMsgAllCountLock sync.RWMutex ) func Init(rpcPort, wsPort int) { diff --git a/internal/msg_gateway/gate/logic.go b/internal/msg_gateway/gate/logic.go index 51955f0fe..5644f1cb3 100644 --- a/internal/msg_gateway/gate/logic.go +++ b/internal/msg_gateway/gate/logic.go @@ -143,7 +143,9 @@ func (ws *WServer) pullMsgBySeqListResp(conn *UserConn, m *Req, pb *sdk_ws.PullM } func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) { + sendMsgAllCountLock.Lock() sendMsgAllCount++ + sendMsgAllCountLock.Unlock() log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, m.Data) nReply := new(pbChat.SendMsgResp) isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg) diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index 33e350003..032108ee9 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -272,4 +272,4 @@ const BigVersion = "v3" const LogFileName = "OpenIM.log" -const StatisticsTimeInterval = 300 +const StatisticsTimeInterval = 60 From 681cf349e66aba1ef115dc86085c1d901587e781 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 18 May 2022 11:53:57 +0800 Subject: [PATCH 2/7] organization --- internal/api/office/tag.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/api/office/tag.go b/internal/api/office/tag.go index c187ecef5..02278f60e 100644 --- a/internal/api/office/tag.go +++ b/internal/api/office/tag.go @@ -11,6 +11,7 @@ import ( "Open_IM/pkg/utils" "context" "github.com/gin-gonic/gin" + "google.golang.org/grpc" "net/http" "strings" ) @@ -260,7 +261,8 @@ func GetTagSendLogs(c *gin.Context) { } etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfficeName) client := pbOffice.NewOfficeServiceClient(etcdConn) - respPb, err := client.GetTagSendLogs(context.Background(), &reqPb) + maxSizeOption := grpc.MaxCallRecvMsgSize(1024 * 1024 * 20) + respPb, err := client.GetTagSendLogs(context.Background(), &reqPb, maxSizeOption) if err != nil { log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetTagSendLogs failed", err.Error()) c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "GetTagSendLogs rpc server failed" + err.Error()}) From 0e03fb6393cca5df7311cb28fd432a1d335c2b82 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 18 May 2022 12:51:33 +0800 Subject: [PATCH 3/7] organization --- internal/rpc/office/office.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/rpc/office/office.go b/internal/rpc/office/office.go index 1cade65eb..91b8ab669 100644 --- a/internal/rpc/office/office.go +++ b/internal/rpc/office/office.go @@ -240,6 +240,7 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR var tagSendLogs db.TagSendLog wg.Add(len(userIDList)) + var lock sync.Mutex for _, userID := range userIDList { go func(userID string) { defer wg.Done() @@ -248,10 +249,12 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetUserNameByUserID failed", err.Error()) return } + lock.Lock() tagSendLogs.UserList = append(tagSendLogs.UserList, db.TagUser{ UserID: userID, UserName: userName, }) + lock.Unlock() }(userID) } wg.Wait() From 888c837cddfe12ad7f00bff4aa8c67be8e807c2d Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 18 May 2022 15:10:44 +0800 Subject: [PATCH 4/7] office tag async send --- internal/rpc/office/office.go | 49 +++++++++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 8 deletions(-) diff --git a/internal/rpc/office/office.go b/internal/rpc/office/office.go index 91b8ab669..016e6278f 100644 --- a/internal/rpc/office/office.go +++ b/internal/rpc/office/office.go @@ -28,15 +28,18 @@ type officeServer struct { rpcRegisterName string etcdSchema string etcdAddr []string + ch chan tagSendStruct } func NewOfficeServer(port int) *officeServer { log.NewPrivateLog(constant.LogFileName) + ch := make(chan tagSendStruct, 10000) return &officeServer{ rpcPort: port, rpcRegisterName: config.Config.RpcRegisterName.OpenImOfficeName, etcdSchema: config.Config.Etcd.EtcdSchema, etcdAddr: config.Config.Etcd.EtcdAddr, + ch: ch, } } @@ -80,6 +83,7 @@ func (s *officeServer) Run() { log.NewError("0", "RegisterEtcd failed ", err.Error()) return } + go s.sendTagMsgRoutine() err = srv.Serve(listener) if err != nil { log.NewError("0", "Serve failed ", err.Error()) @@ -88,6 +92,23 @@ func (s *officeServer) Run() { log.NewInfo("0", "message cms rpc success") } +type tagSendStruct struct { + operationID string + user *db.User + userID string + content string + senderPlatformID int32 +} + +func (s *officeServer) sendTagMsgRoutine() { + log.NewInfo("", utils.GetSelfFuncName(), "start") + select { + case v := <-s.ch: + msg.TagSendMessage(v.operationID, v.user, v.userID, v.content, v.senderPlatformID) + time.Sleep(time.Millisecond * 500) + } +} + func (s *officeServer) GetUserTags(_ context.Context, req *pbOffice.GetUserTagsReq) (resp *pbOffice.GetUserTagsResp, err error) { log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req ", req.String()) resp = &pbOffice.GetUserTagsResp{ @@ -228,17 +249,29 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR resp.CommonResp.ErrCode = constant.ErrDB.ErrCode return resp, nil } - var wg sync.WaitGroup - wg.Add(len(userIDList)) + for _, userID := range userIDList { - go func(userID string) { - defer wg.Done() - msg.TagSendMessage(req.OperationID, user, userID, req.Content, req.SenderPlatformID) - }(userID) + t := tagSendStruct{ + operationID: req.OperationID, + user: user, + userID: userID, + content: req.Content, + senderPlatformID: 0, + } + select { + case s.ch <- t: + log.NewDebug(t.operationID, utils.GetSelfFuncName(), "msg: ", t, "send success") + // if channel is full, return grpc req + case <-time.After(1 * time.Second): + log.NewError(t.operationID, utils.GetSelfFuncName(), s.ch, "channel is full") + resp.CommonResp.ErrCode = constant.ErrSendLimit.ErrCode + resp.CommonResp.ErrMsg = constant.ErrSendLimit.ErrMsg + return resp, nil + } } - wg.Wait() - var tagSendLogs db.TagSendLog + var tagSendLogs db.TagSendLog + var wg sync.WaitGroup wg.Add(len(userIDList)) var lock sync.Mutex for _, userID := range userIDList { From 9913e679282c0c89341517be21de14ef0ff9a822 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 18 May 2022 15:16:44 +0800 Subject: [PATCH 5/7] office tag async send --- internal/rpc/office/office.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/internal/rpc/office/office.go b/internal/rpc/office/office.go index 016e6278f..93b2d1d2f 100644 --- a/internal/rpc/office/office.go +++ b/internal/rpc/office/office.go @@ -102,10 +102,12 @@ type tagSendStruct struct { func (s *officeServer) sendTagMsgRoutine() { log.NewInfo("", utils.GetSelfFuncName(), "start") - select { - case v := <-s.ch: - msg.TagSendMessage(v.operationID, v.user, v.userID, v.content, v.senderPlatformID) - time.Sleep(time.Millisecond * 500) + for { + select { + case v := <-s.ch: + msg.TagSendMessage(v.operationID, v.user, v.userID, v.content, v.senderPlatformID) + time.Sleep(time.Millisecond * 500) + } } } From b0e349212d26617acd4da7877a6c30fd703c5859 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 18 May 2022 15:25:17 +0800 Subject: [PATCH 6/7] office tag async send --- internal/rpc/office/office.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/rpc/office/office.go b/internal/rpc/office/office.go index 93b2d1d2f..816c272c7 100644 --- a/internal/rpc/office/office.go +++ b/internal/rpc/office/office.go @@ -258,7 +258,7 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR user: user, userID: userID, content: req.Content, - senderPlatformID: 0, + senderPlatformID: req.SenderPlatformID, } select { case s.ch <- t: @@ -281,7 +281,7 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR defer wg.Done() userName, err := im_mysql_model.GetUserNameByUserID(userID) if err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetUserNameByUserID failed", err.Error()) + log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetUserNameByUserID failed", err.Error(), userID) return } lock.Lock() From 18a61061e1071f1300e5dbc673503aeba6e201b3 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 18 May 2022 15:48:42 +0800 Subject: [PATCH 7/7] office tag async send --- internal/rpc/office/office.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/internal/rpc/office/office.go b/internal/rpc/office/office.go index 816c272c7..753fca492 100644 --- a/internal/rpc/office/office.go +++ b/internal/rpc/office/office.go @@ -33,7 +33,7 @@ type officeServer struct { func NewOfficeServer(port int) *officeServer { log.NewPrivateLog(constant.LogFileName) - ch := make(chan tagSendStruct, 10000) + ch := make(chan tagSendStruct, 100000) return &officeServer{ rpcPort: port, rpcRegisterName: config.Config.RpcRegisterName.OpenImOfficeName, @@ -106,7 +106,7 @@ func (s *officeServer) sendTagMsgRoutine() { select { case v := <-s.ch: msg.TagSendMessage(v.operationID, v.user, v.userID, v.content, v.senderPlatformID) - time.Sleep(time.Millisecond * 500) + time.Sleep(time.Millisecond * 100) } } } @@ -251,7 +251,7 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR resp.CommonResp.ErrCode = constant.ErrDB.ErrCode return resp, nil } - + var successUserIDList []string for _, userID := range userIDList { t := tagSendStruct{ operationID: req.OperationID, @@ -263,6 +263,7 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR select { case s.ch <- t: log.NewDebug(t.operationID, utils.GetSelfFuncName(), "msg: ", t, "send success") + successUserIDList = append(successUserIDList, userID) // if channel is full, return grpc req case <-time.After(1 * time.Second): log.NewError(t.operationID, utils.GetSelfFuncName(), s.ch, "channel is full") @@ -274,9 +275,9 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR var tagSendLogs db.TagSendLog var wg sync.WaitGroup - wg.Add(len(userIDList)) + wg.Add(len(successUserIDList)) var lock sync.Mutex - for _, userID := range userIDList { + for _, userID := range successUserIDList { go func(userID string) { defer wg.Done() userName, err := im_mysql_model.GetUserNameByUserID(userID)