From 86b05e16b649bc1551ba9dd360674f477c4173a0 Mon Sep 17 00:00:00 2001 From: WhereAreBugs Date: Wed, 21 Jan 2026 16:24:28 +0800 Subject: [PATCH] bugfix(conversation):removed unexpectedly called functions and itself to avoid out of index query. --- internal/rpc/conversation/conversation.go | 62 ------------------- internal/rpc/msg/delete.go | 9 +-- pkg/common/storage/controller/conversation.go | 6 -- pkg/common/storage/database/conversation.go | 1 - .../storage/database/mgo/conversation.go | 14 +++-- pkg/rpcli/conversation.go | 13 +--- 6 files changed, 12 insertions(+), 93 deletions(-) diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index 707a67b14..b24a6908d 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -24,7 +24,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" @@ -506,17 +505,6 @@ func (c *conversationServer) GetUserConversationIDsHash(ctx context.Context, req return &pbconversation.GetUserConversationIDsHashResp{Hash: hash}, nil } -func (c *conversationServer) GetConversationsByConversationID( - ctx context.Context, - req *pbconversation.GetConversationsByConversationIDReq, -) (*pbconversation.GetConversationsByConversationIDResp, error) { - conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, req.ConversationIDs) - if err != nil { - return nil, err - } - return &pbconversation.GetConversationsByConversationIDResp{Conversations: convert.ConversationsDB2Pb(conversations)}, nil -} - func (c *conversationServer) GetConversationOfflinePushUserIDs(ctx context.Context, req *pbconversation.GetConversationOfflinePushUserIDsReq) (*pbconversation.GetConversationOfflinePushUserIDsResp, error) { if req.ConversationID == "" { return nil, errs.ErrArgs.WrapMsg("conversationID is empty") @@ -708,56 +696,6 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco }, nil } -func (c *conversationServer) GetConversationsNeedClearMsg(ctx context.Context, _ *pbconversation.GetConversationsNeedClearMsgReq) (*pbconversation.GetConversationsNeedClearMsgResp, error) { - num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx) - if err != nil { - log.ZError(ctx, "GetAllConversationIDsNumber failed", err) - return nil, err - } - const batchNum = 100 - - if num == 0 { - return nil, errs.New("Need Destruct Msg is nil").Wrap() - } - - maxPage := (num + batchNum - 1) / batchNum - - temp := make([]*model.Conversation, 0, maxPage*batchNum) - - for pageNumber := 0; pageNumber < int(maxPage); pageNumber++ { - pagination := &sdkws.RequestPagination{ - PageNumber: int32(pageNumber), - ShowNumber: batchNum, - } - - conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination) - if err != nil { - log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) - continue - } - - // log.ZDebug(ctx, "PageConversationIDs success", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs) - if len(conversationIDs) == 0 { - continue - } - - conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, conversationIDs) - if err != nil { - log.ZError(ctx, "GetConversationsByConversationID failed", err, "conversationIDs", conversationIDs) - continue - } - - for _, conversation := range conversations { - if conversation.IsMsgDestruct && conversation.MsgDestructTime != 0 && ((time.Now().UnixMilli() > (conversation.MsgDestructTime + conversation.LatestMsgDestructTime.UnixMilli() + 8*60*60)) || // 8*60*60 is UTC+8 - conversation.LatestMsgDestructTime.IsZero()) { - temp = append(temp, conversation) - } - } - } - - return &pbconversation.GetConversationsNeedClearMsgResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil -} - func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) { conversationIDs, err := c.conversationDatabase.GetNotNotifyConversationIDs(ctx, req.UserID) if err != nil { diff --git a/internal/rpc/msg/delete.go b/internal/rpc/msg/delete.go index d3485faaa..621fbca28 100644 --- a/internal/rpc/msg/delete.go +++ b/internal/rpc/msg/delete.go @@ -19,7 +19,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/log" @@ -74,7 +73,7 @@ func (m *msgServer) DeleteMsgs(ctx context.Context, req *msg.DeleteMsgsReq) (*ms if err := m.MsgDatabase.DeleteMsgsPhysicalBySeqs(ctx, req.ConversationID, req.Seqs); err != nil { return nil, err } - conv, err := m.conversationClient.GetConversationsByConversationID(ctx, req.ConversationID) + conv, err := m.conversationClient.GetConversation(ctx, req.ConversationID, req.UserID) if err != nil { return nil, err } @@ -113,14 +112,12 @@ func (m *msgServer) DeleteMsgPhysical(ctx context.Context, req *msg.DeleteMsgPhy } func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []string, userID string, deleteSyncOpt *msg.DeleteSyncOpt) error { - conversations, err := m.conversationClient.GetConversationsByConversationIDs(ctx, conversationIDs) + conversations, err := m.conversationClient.GetConversations(ctx, conversationIDs, userID) if err != nil { return err } - var existConversations []*conversation.Conversation var existConversationIDs []string for _, conversation := range conversations { - existConversations = append(existConversations, conversation) existConversationIDs = append(existConversationIDs, conversation.ConversationID) } log.ZDebug(ctx, "ClearConversationsMsg", "existConversationIDs", existConversationIDs) @@ -149,7 +146,7 @@ func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []str if err := m.MsgDatabase.SetMinSeqs(ctx, m.getMinSeqs(maxSeqs)); err != nil { return err } - for _, conversation := range existConversations { + for _, conversation := range conversations { tips := &sdkws.ClearConversationTips{UserID: userID, ConversationIDs: []string{conversation.ConversationID}} m.notificationSender.NotificationWithSessionType(ctx, userID, m.conversationAndGetRecvID(conversation, userID), constant.ClearConversationNotification, conversation.ConversationType, tips) } diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index d4088e0c0..ec9cc5fc6 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -59,8 +59,6 @@ type ConversationDatabase interface { GetAllConversationIDsNumber(ctx context.Context) (int64, error) // PageConversationIDs paginates through conversation IDs based on the specified pagination settings. PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error) - // GetConversationsByConversationID retrieves conversations by their IDs. - GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.Conversation, error) // GetConversationIDsNeedDestruct fetches conversations that need to be destructed based on specific criteria. GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.Conversation, error) // GetConversationNotReceiveMessageUserIDs gets user IDs for users in a conversation who have not received messages. @@ -351,10 +349,6 @@ func (c *conversationDatabase) PageConversationIDs(ctx context.Context, paginati return c.conversationDB.PageConversationIDs(ctx, pagination) } -func (c *conversationDatabase) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.Conversation, error) { - return c.conversationDB.GetConversationsByConversationID(ctx, conversationIDs) -} - func (c *conversationDatabase) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.Conversation, error) { return c.conversationDB.GetConversationIDsNeedDestruct(ctx) } diff --git a/pkg/common/storage/database/conversation.go b/pkg/common/storage/database/conversation.go index 1fb53cfed..155c698f6 100644 --- a/pkg/common/storage/database/conversation.go +++ b/pkg/common/storage/database/conversation.go @@ -38,7 +38,6 @@ type Conversation interface { GetAllConversationIDs(ctx context.Context) ([]string, error) GetAllConversationIDsNumber(ctx context.Context) (int64, error) PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error) - GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error) GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) diff --git a/pkg/common/storage/database/mgo/conversation.go b/pkg/common/storage/database/mgo/conversation.go index 536827450..e6be4fd4c 100644 --- a/pkg/common/storage/database/mgo/conversation.go +++ b/pkg/common/storage/database/mgo/conversation.go @@ -32,13 +32,19 @@ import ( func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) { coll := db.Collection(database.ConversationName) - _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ + _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{{ Keys: bson.D{ {Key: "owner_user_id", Value: 1}, {Key: "conversation_id", Value: 1}, }, Options: options.Index().SetUnique(true), - }) + }, { + Keys: bson.D{ + {Key: "conversation_id", Value: 1}, + }, + Options: options.Index().SetUnique(true), + }}, + ) if err != nil { return nil, errs.Wrap(err) } @@ -191,10 +197,6 @@ func (c *ConversationMgo) PageConversationIDs(ctx context.Context, pagination pa return mongoutil.FindPageOnly[string](ctx, c.coll, bson.M{}, pagination, options.Find().SetProjection(bson.M{"conversation_id": 1})) } -func (c *ConversationMgo) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error) { - return mongoutil.Find[*model.Conversation](ctx, c.coll, bson.M{"conversation_id": bson.M{"$in": conversationIDs}}) -} - func (c *ConversationMgo) GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error) { // "is_msg_destruct = 1 && msg_destruct_time != 0 && (UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) || latest_msg_destruct_time is NULL)" return mongoutil.Find[*model.Conversation](ctx, c.coll, bson.M{ diff --git a/pkg/rpcli/conversation.go b/pkg/rpcli/conversation.go index ba5b90cb3..a5b7446a1 100644 --- a/pkg/rpcli/conversation.go +++ b/pkg/rpcli/conversation.go @@ -2,6 +2,7 @@ package rpcli import ( "context" + "github.com/openimsdk/protocol/conversation" "google.golang.org/grpc" ) @@ -30,18 +31,6 @@ func (x *ConversationClient) SetConversations(ctx context.Context, ownerUserIDs return ignoreResp(x.ConversationClient.SetConversations(ctx, req)) } -func (x *ConversationClient) GetConversationsByConversationIDs(ctx context.Context, conversationIDs []string) ([]*conversation.Conversation, error) { - if len(conversationIDs) == 0 { - return nil, nil - } - req := &conversation.GetConversationsByConversationIDReq{ConversationIDs: conversationIDs} - return extractField(ctx, x.ConversationClient.GetConversationsByConversationID, req, (*conversation.GetConversationsByConversationIDResp).GetConversations) -} - -func (x *ConversationClient) GetConversationsByConversationID(ctx context.Context, conversationID string) (*conversation.Conversation, error) { - return firstValue(x.GetConversationsByConversationIDs(ctx, []string{conversationID})) -} - func (x *ConversationClient) SetConversationMinSeq(ctx context.Context, conversationID string, ownerUserIDs []string, minSeq int64) error { if len(ownerUserIDs) == 0 { return nil