|
|
|
@ -18,11 +18,12 @@ import (
|
|
|
|
|
"context"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"errors"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
|
|
|
|
"strings"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
|
|
|
|
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
|
|
|
@ -97,8 +98,10 @@ type CommonMsgDatabase interface {
|
|
|
|
|
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
|
|
|
|
|
|
|
|
|
|
// clear msg
|
|
|
|
|
GetBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error)
|
|
|
|
|
GetBeforeMsg(ctx context.Context, ts int64, docIds []string, limit int) ([]*model.MsgDocModel, error)
|
|
|
|
|
DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error)
|
|
|
|
|
|
|
|
|
|
GetDocIDs(ctx context.Context) ([]string, error)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
|
|
|
|
@ -912,8 +915,16 @@ func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversation
|
|
|
|
|
db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (db *commonMsgDatabase) GetBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) {
|
|
|
|
|
return db.msgDocDatabase.GetBeforeMsg(ctx, ts, limit)
|
|
|
|
|
func (db *commonMsgDatabase) GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, limit int) ([]*model.MsgDocModel, error) {
|
|
|
|
|
var msgs []*model.MsgDocModel
|
|
|
|
|
for i := 0; i < len(docIDs); i += 1000 {
|
|
|
|
|
res, err := db.msgDocDatabase.GetBeforeMsg(ctx, ts, docIDs[i:i+1000], limit)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
msgs = append(msgs, res...)
|
|
|
|
|
}
|
|
|
|
|
return msgs, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) {
|
|
|
|
@ -955,3 +966,16 @@ func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID strin
|
|
|
|
|
}
|
|
|
|
|
return db.seqConversation.SetMinSeq(ctx, conversationID, seq)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (db *commonMsgDatabase) GetDocIDs(ctx context.Context) ([]string, error) {
|
|
|
|
|
var docIDsList []string
|
|
|
|
|
|
|
|
|
|
docIDs, err := db.msgDocDatabase.GetDocIDs(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, errs.Wrap(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
docIDsList = append(docIDsList, docIDs...)
|
|
|
|
|
|
|
|
|
|
return docIDsList, nil
|
|
|
|
|
}
|
|
|
|
|