bugfix(conversation):removed unexpectedly called functions and itself to avoid out of index query.

pull/3667/head
WhereAreBugs 2 days ago
parent 5f81f632f8
commit 86b05e16b6

@ -24,7 +24,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
@ -506,17 +505,6 @@ func (c *conversationServer) GetUserConversationIDsHash(ctx context.Context, req
return &pbconversation.GetUserConversationIDsHashResp{Hash: hash}, nil
}
func (c *conversationServer) GetConversationsByConversationID(
ctx context.Context,
req *pbconversation.GetConversationsByConversationIDReq,
) (*pbconversation.GetConversationsByConversationIDResp, error) {
conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, req.ConversationIDs)
if err != nil {
return nil, err
}
return &pbconversation.GetConversationsByConversationIDResp{Conversations: convert.ConversationsDB2Pb(conversations)}, nil
}
func (c *conversationServer) GetConversationOfflinePushUserIDs(ctx context.Context, req *pbconversation.GetConversationOfflinePushUserIDsReq) (*pbconversation.GetConversationOfflinePushUserIDsResp, error) {
if req.ConversationID == "" {
return nil, errs.ErrArgs.WrapMsg("conversationID is empty")
@ -708,56 +696,6 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco
}, nil
}
func (c *conversationServer) GetConversationsNeedClearMsg(ctx context.Context, _ *pbconversation.GetConversationsNeedClearMsgReq) (*pbconversation.GetConversationsNeedClearMsgResp, error) {
num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx)
if err != nil {
log.ZError(ctx, "GetAllConversationIDsNumber failed", err)
return nil, err
}
const batchNum = 100
if num == 0 {
return nil, errs.New("Need Destruct Msg is nil").Wrap()
}
maxPage := (num + batchNum - 1) / batchNum
temp := make([]*model.Conversation, 0, maxPage*batchNum)
for pageNumber := 0; pageNumber < int(maxPage); pageNumber++ {
pagination := &sdkws.RequestPagination{
PageNumber: int32(pageNumber),
ShowNumber: batchNum,
}
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination)
if err != nil {
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
continue
}
// log.ZDebug(ctx, "PageConversationIDs success", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs)
if len(conversationIDs) == 0 {
continue
}
conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, conversationIDs)
if err != nil {
log.ZError(ctx, "GetConversationsByConversationID failed", err, "conversationIDs", conversationIDs)
continue
}
for _, conversation := range conversations {
if conversation.IsMsgDestruct && conversation.MsgDestructTime != 0 && ((time.Now().UnixMilli() > (conversation.MsgDestructTime + conversation.LatestMsgDestructTime.UnixMilli() + 8*60*60)) || // 8*60*60 is UTC+8
conversation.LatestMsgDestructTime.IsZero()) {
temp = append(temp, conversation)
}
}
}
return &pbconversation.GetConversationsNeedClearMsgResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil
}
func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) {
conversationIDs, err := c.conversationDatabase.GetNotNotifyConversationIDs(ctx, req.UserID)
if err != nil {

@ -19,7 +19,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/log"
@ -74,7 +73,7 @@ func (m *msgServer) DeleteMsgs(ctx context.Context, req *msg.DeleteMsgsReq) (*ms
if err := m.MsgDatabase.DeleteMsgsPhysicalBySeqs(ctx, req.ConversationID, req.Seqs); err != nil {
return nil, err
}
conv, err := m.conversationClient.GetConversationsByConversationID(ctx, req.ConversationID)
conv, err := m.conversationClient.GetConversation(ctx, req.ConversationID, req.UserID)
if err != nil {
return nil, err
}
@ -113,14 +112,12 @@ func (m *msgServer) DeleteMsgPhysical(ctx context.Context, req *msg.DeleteMsgPhy
}
func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []string, userID string, deleteSyncOpt *msg.DeleteSyncOpt) error {
conversations, err := m.conversationClient.GetConversationsByConversationIDs(ctx, conversationIDs)
conversations, err := m.conversationClient.GetConversations(ctx, conversationIDs, userID)
if err != nil {
return err
}
var existConversations []*conversation.Conversation
var existConversationIDs []string
for _, conversation := range conversations {
existConversations = append(existConversations, conversation)
existConversationIDs = append(existConversationIDs, conversation.ConversationID)
}
log.ZDebug(ctx, "ClearConversationsMsg", "existConversationIDs", existConversationIDs)
@ -149,7 +146,7 @@ func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []str
if err := m.MsgDatabase.SetMinSeqs(ctx, m.getMinSeqs(maxSeqs)); err != nil {
return err
}
for _, conversation := range existConversations {
for _, conversation := range conversations {
tips := &sdkws.ClearConversationTips{UserID: userID, ConversationIDs: []string{conversation.ConversationID}}
m.notificationSender.NotificationWithSessionType(ctx, userID, m.conversationAndGetRecvID(conversation, userID), constant.ClearConversationNotification, conversation.ConversationType, tips)
}

@ -59,8 +59,6 @@ type ConversationDatabase interface {
GetAllConversationIDsNumber(ctx context.Context) (int64, error)
// PageConversationIDs paginates through conversation IDs based on the specified pagination settings.
PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error)
// GetConversationsByConversationID retrieves conversations by their IDs.
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.Conversation, error)
// GetConversationIDsNeedDestruct fetches conversations that need to be destructed based on specific criteria.
GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.Conversation, error)
// GetConversationNotReceiveMessageUserIDs gets user IDs for users in a conversation who have not received messages.
@ -351,10 +349,6 @@ func (c *conversationDatabase) PageConversationIDs(ctx context.Context, paginati
return c.conversationDB.PageConversationIDs(ctx, pagination)
}
func (c *conversationDatabase) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.Conversation, error) {
return c.conversationDB.GetConversationsByConversationID(ctx, conversationIDs)
}
func (c *conversationDatabase) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.Conversation, error) {
return c.conversationDB.GetConversationIDsNeedDestruct(ctx)
}

@ -38,7 +38,6 @@ type Conversation interface {
GetAllConversationIDs(ctx context.Context) ([]string, error)
GetAllConversationIDsNumber(ctx context.Context) (int64, error)
PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error)
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error)
GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error)
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)

@ -32,13 +32,19 @@ import (
func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) {
coll := db.Collection(database.ConversationName)
_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{{
Keys: bson.D{
{Key: "owner_user_id", Value: 1},
{Key: "conversation_id", Value: 1},
},
Options: options.Index().SetUnique(true),
})
}, {
Keys: bson.D{
{Key: "conversation_id", Value: 1},
},
Options: options.Index().SetUnique(true),
}},
)
if err != nil {
return nil, errs.Wrap(err)
}
@ -191,10 +197,6 @@ func (c *ConversationMgo) PageConversationIDs(ctx context.Context, pagination pa
return mongoutil.FindPageOnly[string](ctx, c.coll, bson.M{}, pagination, options.Find().SetProjection(bson.M{"conversation_id": 1}))
}
func (c *ConversationMgo) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error) {
return mongoutil.Find[*model.Conversation](ctx, c.coll, bson.M{"conversation_id": bson.M{"$in": conversationIDs}})
}
func (c *ConversationMgo) GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error) {
// "is_msg_destruct = 1 && msg_destruct_time != 0 && (UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) || latest_msg_destruct_time is NULL)"
return mongoutil.Find[*model.Conversation](ctx, c.coll, bson.M{

@ -2,6 +2,7 @@ package rpcli
import (
"context"
"github.com/openimsdk/protocol/conversation"
"google.golang.org/grpc"
)
@ -30,18 +31,6 @@ func (x *ConversationClient) SetConversations(ctx context.Context, ownerUserIDs
return ignoreResp(x.ConversationClient.SetConversations(ctx, req))
}
func (x *ConversationClient) GetConversationsByConversationIDs(ctx context.Context, conversationIDs []string) ([]*conversation.Conversation, error) {
if len(conversationIDs) == 0 {
return nil, nil
}
req := &conversation.GetConversationsByConversationIDReq{ConversationIDs: conversationIDs}
return extractField(ctx, x.ConversationClient.GetConversationsByConversationID, req, (*conversation.GetConversationsByConversationIDResp).GetConversations)
}
func (x *ConversationClient) GetConversationsByConversationID(ctx context.Context, conversationID string) (*conversation.Conversation, error) {
return firstValue(x.GetConversationsByConversationIDs(ctx, []string{conversationID}))
}
func (x *ConversationClient) SetConversationMinSeq(ctx context.Context, conversationID string, ownerUserIDs []string, minSeq int64) error {
if len(ownerUserIDs) == 0 {
return nil

Loading…
Cancel
Save