update waitgroup to errgroup.

pull/2466/head
Monet Lee 1 year ago
parent 3e547958b0
commit 07eb0cb387

@ -3,12 +3,10 @@ package msg
import (
"context"
"strings"
"sync"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/conversation"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
@ -18,6 +16,7 @@ import (
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/idutil"
"github.com/openimsdk/tools/utils/stringutil"
"golang.org/x/sync/errgroup"
)
// hard delete in Database.
@ -94,16 +93,15 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq)
temp := convert.ConversationsPb2DB(req.Conversations)
batchNum := 100
var wg sync.WaitGroup
wg.Add((len(temp) + batchNum - 1) / batchNum)
errg, _ := errgroup.WithContext(ctx)
errg.SetLimit(100)
for i := 0; i < len(temp); i += batchNum {
batch := temp[i:min(i+batchNum, len(temp))]
go func(batch []*model.Conversation) {
defer wg.Done()
for _, conversation := range temp {
errg.Go(func() error {
for _, conversation := range batch {
handleCtx := mcontext.NewCtx(stringutil.GetSelfFuncName() + "-" + idutil.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID)
log.ZDebug(handleCtx, "User MsgsDestruct",
"conversationID", conversation.ConversationID,
@ -131,10 +129,8 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq)
// m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs)
}
}
}(batch)
return nil
})
}
wg.Wait()
return nil, nil
}

Loading…
Cancel
Save