From a0e6d9aa69f8a49852c1d82a2dc13221da7ad540 Mon Sep 17 00:00:00 2001 From: chao <48119764+withchao@users.noreply.github.com> Date: Thu, 15 Jan 2026 14:24:17 +0800 Subject: [PATCH 1/2] fix: Mongo Malloc upsert overwrites min_seq initialization (#3657) * fix: performance issues with Kafka caused by encapsulating the MQ interface * fix: admin token in standalone mode * fix: full id version * fix: resolve deadlock in cache eviction and improve GetBatch implementation * refactor: replace LongConn with ClientConn interface and simplify message handling * refactor: replace LongConn with ClientConn interface and simplify message handling * fix: seq use $setOnInsert for min_seq in conversation update --- pkg/common/storage/database/mgo/seq_conversation.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/common/storage/database/mgo/seq_conversation.go b/pkg/common/storage/database/mgo/seq_conversation.go index 7971b7e1a..b9eb6c2a9 100644 --- a/pkg/common/storage/database/mgo/seq_conversation.go +++ b/pkg/common/storage/database/mgo/seq_conversation.go @@ -57,8 +57,8 @@ func (s *seqConversationMongo) Malloc(ctx context.Context, conversationID string } filter := map[string]any{"conversation_id": conversationID} update := map[string]any{ - "$inc": map[string]any{"max_seq": size}, - "$set": map[string]any{"min_seq": int64(0)}, + "$inc": map[string]any{"max_seq": size}, + "$setOnInsert": map[string]any{"min_seq": int64(0)}, } opt := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After).SetProjection(map[string]any{"_id": 0, "max_seq": 1}) lastSeq, err := mongoutil.FindOneAndUpdate[int64](ctx, s.coll, filter, update, opt) From 579db3bd4861cc61cd829764f7ce061a16389b46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A5=9E=E5=A5=87bug=E5=9C=A8=E5=93=AA=E9=87=8C?= <35194310+WhereAreBugs@users.noreply.github.com> Date: Wed, 21 Jan 2026 18:10:29 +0800 Subject: [PATCH 2/2] bugfix(conversation):removed unexpectedly called functions and itself to avoid out of index query. (#3668) # Conflicts: # internal/rpc/conversation/conversation.go # pkg/common/storage/database/mgo/conversation.go --- internal/rpc/conversation/conversation.go | 58 ------------------- internal/rpc/msg/delete.go | 9 +-- pkg/common/storage/controller/conversation.go | 6 -- pkg/common/storage/database/conversation.go | 1 - .../storage/database/mgo/conversation.go | 10 ++-- pkg/rpcli/conversation.go | 13 +---- 6 files changed, 10 insertions(+), 87 deletions(-) diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index 562a76d82..a48c06051 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -513,14 +513,6 @@ func (c *conversationServer) GetUserConversationIDsHash(ctx context.Context, req return &pbconversation.GetUserConversationIDsHashResp{Hash: hash}, nil } -func (c *conversationServer) GetConversationsByConversationID(ctx context.Context, req *pbconversation.GetConversationsByConversationIDReq) (*pbconversation.GetConversationsByConversationIDResp, error) { - conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, req.ConversationIDs) - if err != nil { - return nil, err - } - return &pbconversation.GetConversationsByConversationIDResp{Conversations: convert.ConversationsDB2Pb(conversations)}, nil -} - func (c *conversationServer) GetConversationOfflinePushUserIDs(ctx context.Context, req *pbconversation.GetConversationOfflinePushUserIDsReq) (*pbconversation.GetConversationOfflinePushUserIDsResp, error) { if req.ConversationID == "" { return nil, errs.ErrArgs.WrapMsg("conversationID is empty") @@ -717,56 +709,6 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco }, nil } -func (c *conversationServer) GetConversationsNeedClearMsg(ctx context.Context, _ *pbconversation.GetConversationsNeedClearMsgReq) (*pbconversation.GetConversationsNeedClearMsgResp, error) { - num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx) - if err != nil { - log.ZError(ctx, "GetAllConversationIDsNumber failed", err) - return nil, err - } - const batchNum = 100 - - if num == 0 { - return nil, errs.New("Need Destruct Msg is nil").Wrap() - } - - maxPage := (num + batchNum - 1) / batchNum - - temp := make([]*dbModel.Conversation, 0, maxPage*batchNum) - - for pageNumber := 0; pageNumber < int(maxPage); pageNumber++ { - pagination := &sdkws.RequestPagination{ - PageNumber: int32(pageNumber), - ShowNumber: batchNum, - } - - conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination) - if err != nil { - log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) - continue - } - - // log.ZDebug(ctx, "PageConversationIDs success", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs) - if len(conversationIDs) == 0 { - continue - } - - conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, conversationIDs) - if err != nil { - log.ZError(ctx, "GetConversationsByConversationID failed", err, "conversationIDs", conversationIDs) - continue - } - - for _, conversation := range conversations { - if conversation.IsMsgDestruct && conversation.MsgDestructTime != 0 && ((time.Now().UnixMilli() > (conversation.MsgDestructTime + conversation.LatestMsgDestructTime.UnixMilli() + 8*60*60)) || // 8*60*60 is UTC+8 - conversation.LatestMsgDestructTime.IsZero()) { - temp = append(temp, conversation) - } - } - } - - return &pbconversation.GetConversationsNeedClearMsgResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil -} - func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) { if err := authverify.CheckAccess(ctx, req.UserID); err != nil { return nil, err diff --git a/internal/rpc/msg/delete.go b/internal/rpc/msg/delete.go index d24420ebb..41602e2e6 100644 --- a/internal/rpc/msg/delete.go +++ b/internal/rpc/msg/delete.go @@ -19,7 +19,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/log" @@ -74,7 +73,7 @@ func (m *msgServer) DeleteMsgs(ctx context.Context, req *msg.DeleteMsgsReq) (*ms if err := m.MsgDatabase.DeleteMsgsPhysicalBySeqs(ctx, req.ConversationID, req.Seqs); err != nil { return nil, err } - conv, err := m.conversationClient.GetConversationsByConversationID(ctx, req.ConversationID) + conv, err := m.conversationClient.GetConversation(ctx, req.ConversationID, req.UserID) if err != nil { return nil, err } @@ -116,14 +115,12 @@ func (m *msgServer) DeleteMsgPhysical(ctx context.Context, req *msg.DeleteMsgPhy } func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []string, userID string, deleteSyncOpt *msg.DeleteSyncOpt) error { - conversations, err := m.conversationClient.GetConversationsByConversationIDs(ctx, conversationIDs) + conversations, err := m.conversationClient.GetConversations(ctx, conversationIDs, userID) if err != nil { return err } - var existConversations []*conversation.Conversation var existConversationIDs []string for _, conversation := range conversations { - existConversations = append(existConversations, conversation) existConversationIDs = append(existConversationIDs, conversation.ConversationID) } log.ZDebug(ctx, "ClearConversationsMsg", "existConversationIDs", existConversationIDs) @@ -152,7 +149,7 @@ func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []str if err := m.MsgDatabase.SetMinSeqs(ctx, m.getMinSeqs(maxSeqs)); err != nil { return err } - for _, conversation := range existConversations { + for _, conversation := range conversations { tips := &sdkws.ClearConversationTips{UserID: userID, ConversationIDs: []string{conversation.ConversationID}} m.notificationSender.NotificationWithSessionType(ctx, userID, m.conversationAndGetRecvID(conversation, userID), constant.ClearConversationNotification, conversation.ConversationType, tips) } diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index d085c7466..5ec62b33c 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -61,8 +61,6 @@ type ConversationDatabase interface { GetAllConversationIDsNumber(ctx context.Context) (int64, error) // PageConversationIDs paginates through conversation IDs based on the specified pagination settings. PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error) - // GetConversationsByConversationID retrieves conversations by their IDs. - GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.Conversation, error) // GetConversationIDsNeedDestruct fetches conversations that need to be destructed based on specific criteria. GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.Conversation, error) // GetConversationNotReceiveMessageUserIDs gets user IDs for users in a conversation who have not received messages. @@ -375,10 +373,6 @@ func (c *conversationDatabase) PageConversationIDs(ctx context.Context, paginati return c.conversationDB.PageConversationIDs(ctx, pagination) } -func (c *conversationDatabase) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.Conversation, error) { - return c.conversationDB.GetConversationsByConversationID(ctx, conversationIDs) -} - func (c *conversationDatabase) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.Conversation, error) { return c.conversationDB.GetConversationIDsNeedDestruct(ctx) } diff --git a/pkg/common/storage/database/conversation.go b/pkg/common/storage/database/conversation.go index 562782346..6a93aa955 100644 --- a/pkg/common/storage/database/conversation.go +++ b/pkg/common/storage/database/conversation.go @@ -39,7 +39,6 @@ type Conversation interface { GetAllConversationIDs(ctx context.Context) ([]string, error) GetAllConversationIDsNumber(ctx context.Context) (int64, error) PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error) - GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error) GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) diff --git a/pkg/common/storage/database/mgo/conversation.go b/pkg/common/storage/database/mgo/conversation.go index d5ab046aa..c1d00f0ed 100644 --- a/pkg/common/storage/database/mgo/conversation.go +++ b/pkg/common/storage/database/mgo/conversation.go @@ -47,6 +47,12 @@ func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) { }, Options: options.Index(), }, + { + Keys: bson.D{ + {Key: "conversation_id", Value: 1}, + }, + Options: options.Index().SetUnique(true), + }, }) if err != nil { return nil, errs.Wrap(err) @@ -232,10 +238,6 @@ func (c *ConversationMgo) PageConversationIDs(ctx context.Context, pagination pa return mongoutil.FindPageOnly[string](ctx, c.coll, bson.M{}, pagination, options.Find().SetProjection(bson.M{"conversation_id": 1})) } -func (c *ConversationMgo) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error) { - return mongoutil.Find[*model.Conversation](ctx, c.coll, bson.M{"conversation_id": bson.M{"$in": conversationIDs}}) -} - func (c *ConversationMgo) GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error) { // "is_msg_destruct = 1 && msg_destruct_time != 0 && (UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) || latest_msg_destruct_time is NULL)" return mongoutil.Find[*model.Conversation](ctx, c.coll, bson.M{ diff --git a/pkg/rpcli/conversation.go b/pkg/rpcli/conversation.go index ba5b90cb3..a5b7446a1 100644 --- a/pkg/rpcli/conversation.go +++ b/pkg/rpcli/conversation.go @@ -2,6 +2,7 @@ package rpcli import ( "context" + "github.com/openimsdk/protocol/conversation" "google.golang.org/grpc" ) @@ -30,18 +31,6 @@ func (x *ConversationClient) SetConversations(ctx context.Context, ownerUserIDs return ignoreResp(x.ConversationClient.SetConversations(ctx, req)) } -func (x *ConversationClient) GetConversationsByConversationIDs(ctx context.Context, conversationIDs []string) ([]*conversation.Conversation, error) { - if len(conversationIDs) == 0 { - return nil, nil - } - req := &conversation.GetConversationsByConversationIDReq{ConversationIDs: conversationIDs} - return extractField(ctx, x.ConversationClient.GetConversationsByConversationID, req, (*conversation.GetConversationsByConversationIDResp).GetConversations) -} - -func (x *ConversationClient) GetConversationsByConversationID(ctx context.Context, conversationID string) (*conversation.Conversation, error) { - return firstValue(x.GetConversationsByConversationIDs(ctx, []string{conversationID})) -} - func (x *ConversationClient) SetConversationMinSeq(ctx context.Context, conversationID string, ownerUserIDs []string, minSeq int64) error { if len(ownerUserIDs) == 0 { return nil