From 07eb0cb387d1d24f825c20132d6d8cf79e1347b7 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Thu, 1 Aug 2024 15:05:00 +0800 Subject: [PATCH] update waitgroup to errgroup. --- internal/rpc/msg/clear.go | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/internal/rpc/msg/clear.go b/internal/rpc/msg/clear.go index fd121318b..2b22ef513 100644 --- a/internal/rpc/msg/clear.go +++ b/internal/rpc/msg/clear.go @@ -3,12 +3,10 @@ package msg import ( "context" "strings" - "sync" "time" "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/protocol/conversation" pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" @@ -18,6 +16,7 @@ import ( "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/idutil" "github.com/openimsdk/tools/utils/stringutil" + "golang.org/x/sync/errgroup" ) // hard delete in Database. @@ -94,16 +93,15 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) temp := convert.ConversationsPb2DB(req.Conversations) batchNum := 100 - var wg sync.WaitGroup - wg.Add((len(temp) + batchNum - 1) / batchNum) + + errg, _ := errgroup.WithContext(ctx) + errg.SetLimit(100) for i := 0; i < len(temp); i += batchNum { batch := temp[i:min(i+batchNum, len(temp))] - go func(batch []*model.Conversation) { - defer wg.Done() - - for _, conversation := range temp { + errg.Go(func() error { + for _, conversation := range batch { handleCtx := mcontext.NewCtx(stringutil.GetSelfFuncName() + "-" + idutil.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID) log.ZDebug(handleCtx, "User MsgsDestruct", "conversationID", conversation.ConversationID, @@ -131,10 +129,8 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) // m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs) } } - }(batch) - + return nil + }) } - wg.Wait() - return nil, nil }