You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cloudreve/pkg/filemanager/manager/fulltextindex.go

537 lines
16 KiB

package manager
import (
"context"
"encoding/json"
"fmt"
"github.com/cloudreve/Cloudreve/v4/application/dependency"
"github.com/cloudreve/Cloudreve/v4/ent"
"github.com/cloudreve/Cloudreve/v4/ent/task"
"github.com/cloudreve/Cloudreve/v4/inventory"
"github.com/cloudreve/Cloudreve/v4/inventory/types"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs/dbfs"
"github.com/cloudreve/Cloudreve/v4/pkg/hashid"
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
"github.com/cloudreve/Cloudreve/v4/pkg/searcher"
"github.com/cloudreve/Cloudreve/v4/pkg/util"
"github.com/samber/lo"
)
type (
FullTextIndexTask struct {
*queue.DBTask
}
FullTextIndexTaskState struct {
Uri *fs.URI `json:"uri"`
EntityID int `json:"entity_id"`
FileID int `json:"file_id"`
OwnerID int `json:"owner_id"`
}
ftsFileInfo struct {
FileID int
OwnerID int
EntityID int
FileName string
}
)
func (m *manager) SearchFullText(ctx context.Context, query string, offset int) (*FullTextSearchResults, error) {
indexer := m.dep.SearchIndexer(ctx)
results, total, err := indexer.Search(ctx, m.user.ID, query, offset)
if err != nil {
return nil, fmt.Errorf("failed to search full text: %w", err)
}
if len(results) == 0 {
// No results.
return &FullTextSearchResults{}, nil
}
// Traverse each file in result
files := lo.FilterMap(results, func(result searcher.SearchResult, _ int) (FullTextSearchResult, bool) {
file, err := m.TraverseFile(ctx, result.FileID)
if err != nil {
m.l.Debug("Failed to traverse file %d for full text search: %s, skipping.", result.FileID, err)
return FullTextSearchResult{}, false
}
return FullTextSearchResult{
File: file,
Content: result.Text,
}, true
})
if len(files) == 0 {
// No valid files, run next offset
return m.SearchFullText(ctx, query, offset+len(results))
}
return &FullTextSearchResults{
Hits: files,
Total: total,
}, nil
}
func init() {
queue.RegisterResumableTaskFactory(queue.FullTextIndexTaskType, NewFullTextIndexTaskFromModel)
queue.RegisterResumableTaskFactory(queue.FullTextCopyTaskType, NewFullTextCopyTaskFromModel)
queue.RegisterResumableTaskFactory(queue.FullTextChangeOwnerTaskType, NewFullTextChangeOwnerTaskFromModel)
queue.RegisterResumableTaskFactory(queue.FullTextDeleteTaskType, NewFullTextDeleteTaskFromModel)
}
func NewFullTextIndexTask(ctx context.Context, uri *fs.URI, entityID, fileID, ownerID int, creator *ent.User) (*FullTextIndexTask, error) {
state := &FullTextIndexTaskState{
Uri: uri,
EntityID: entityID,
FileID: fileID,
OwnerID: ownerID,
}
stateBytes, err := json.Marshal(state)
if err != nil {
return nil, fmt.Errorf("failed to marshal state: %w", err)
}
return &FullTextIndexTask{
DBTask: &queue.DBTask{
DirectOwner: creator,
Task: &ent.Task{
Type: queue.FullTextIndexTaskType,
CorrelationID: logging.CorrelationID(ctx),
PrivateState: string(stateBytes),
PublicState: &types.TaskPublicState{},
},
},
}, nil
}
func NewFullTextIndexTaskFromModel(t *ent.Task) queue.Task {
return &FullTextIndexTask{
DBTask: &queue.DBTask{
Task: t,
},
}
}
type (
FullTextCopyTask struct {
*queue.DBTask
}
FullTextCopyTaskState struct {
Uri *fs.URI `json:"uri"`
OriginalFileID int `json:"original_file_id"`
FileID int `json:"file_id"`
OwnerID int `json:"owner_id"`
EntityID int `json:"entity_id"`
}
)
func NewFullTextCopyTask(ctx context.Context, uri *fs.URI, originalFileID, fileID, ownerID, entityID int, creator *ent.User) (*FullTextCopyTask, error) {
state := &FullTextCopyTaskState{
Uri: uri,
OriginalFileID: originalFileID,
FileID: fileID,
OwnerID: ownerID,
EntityID: entityID,
}
stateBytes, err := json.Marshal(state)
if err != nil {
return nil, fmt.Errorf("failed to marshal state: %w", err)
}
return &FullTextCopyTask{
DBTask: &queue.DBTask{
DirectOwner: creator,
Task: &ent.Task{
Type: queue.FullTextCopyTaskType,
CorrelationID: logging.CorrelationID(ctx),
PrivateState: string(stateBytes),
PublicState: &types.TaskPublicState{},
},
},
}, nil
}
func NewFullTextCopyTaskFromModel(t *ent.Task) queue.Task {
return &FullTextCopyTask{
DBTask: &queue.DBTask{
Task: t,
},
}
}
func (t *FullTextCopyTask) Do(ctx context.Context) (task.Status, error) {
dep := dependency.FromContext(ctx)
l := dep.Logger()
fm := NewFileManager(dep, inventory.UserFromContext(ctx)).(*manager)
if !fm.settings.FTSEnabled(ctx) {
l.Debug("FTS disabled, skipping full text copy task.")
return task.StatusCompleted, nil
}
var state FullTextCopyTaskState
if err := json.Unmarshal([]byte(t.State()), &state); err != nil {
return task.StatusError, fmt.Errorf("failed to unmarshal state: %s (%w)", err, queue.CriticalErr)
}
// Get fresh file to make sure task is not stale.
file, err := fm.Get(ctx, state.Uri, dbfs.WithFilePublicMetadata())
if err != nil {
return task.StatusError, fmt.Errorf("failed to get latest file: %w", err)
}
if file.PrimaryEntityID() != state.EntityID {
l.Debug("File %d entity changed, skipping copy index.", state.FileID)
return task.StatusCompleted, nil
}
indexer := dep.SearchIndexer(ctx)
if err := indexer.CopyByFileID(ctx, state.OriginalFileID, state.FileID, state.OwnerID, state.EntityID); err != nil {
l.Warning("Failed to copy index from file %d to %d, falling back to full indexing: %s", state.OriginalFileID, state.FileID, err)
return performIndexing(ctx, fm, state.Uri, state.EntityID, state.FileID, state.OwnerID, file.Name(), false)
}
// Patch metadata to mark file as indexed.
if err := fm.fs.PatchMetadata(ctx, []*fs.URI{state.Uri}, fs.MetadataPatch{
Key: dbfs.FullTextIndexKey,
Value: hashid.EncodeEntityID(fm.hasher, state.EntityID),
}); err != nil {
return task.StatusError, fmt.Errorf("failed to patch metadata: %w", err)
}
l.Debug("Successfully copied index from file %d to %d.", state.OriginalFileID, state.FileID)
return task.StatusCompleted, nil
}
type (
FullTextChangeOwnerTask struct {
*queue.DBTask
}
FullTextChangeOwnerTaskState struct {
Uri *fs.URI `json:"uri"`
EntityID int `json:"entity_id"`
FileID int `json:"file_id"`
OriginalOwnerID int `json:"original_owner_id"`
NewOwnerID int `json:"new_owner_id"`
}
)
func NewFullTextChangeOwnerTask(ctx context.Context, uri *fs.URI, entityID, fileID, originalOwnerID, newOwnerID int, creator *ent.User) (*FullTextChangeOwnerTask, error) {
state := &FullTextChangeOwnerTaskState{
Uri: uri,
EntityID: entityID,
FileID: fileID,
OriginalOwnerID: originalOwnerID,
NewOwnerID: newOwnerID,
}
stateBytes, err := json.Marshal(state)
if err != nil {
return nil, fmt.Errorf("failed to marshal state: %w", err)
}
return &FullTextChangeOwnerTask{
DBTask: &queue.DBTask{
DirectOwner: creator,
Task: &ent.Task{
Type: queue.FullTextChangeOwnerTaskType,
CorrelationID: logging.CorrelationID(ctx),
PrivateState: string(stateBytes),
PublicState: &types.TaskPublicState{},
},
},
}, nil
}
func NewFullTextChangeOwnerTaskFromModel(t *ent.Task) queue.Task {
return &FullTextChangeOwnerTask{
DBTask: &queue.DBTask{
Task: t,
},
}
}
func (t *FullTextChangeOwnerTask) Do(ctx context.Context) (task.Status, error) {
dep := dependency.FromContext(ctx)
l := dep.Logger()
fm := NewFileManager(dep, inventory.UserFromContext(ctx)).(*manager)
if !fm.settings.FTSEnabled(ctx) {
l.Debug("FTS disabled, skipping full text change owner task.")
return task.StatusCompleted, nil
}
var state FullTextChangeOwnerTaskState
if err := json.Unmarshal([]byte(t.State()), &state); err != nil {
return task.StatusError, fmt.Errorf("failed to unmarshal state: %s (%w)", err, queue.CriticalErr)
}
// Get fresh file to make sure task is not stale.
file, err := fm.Get(ctx, state.Uri, dbfs.WithFilePublicMetadata())
if err != nil {
return task.StatusError, fmt.Errorf("failed to get latest file: %w", err)
}
if file.PrimaryEntityID() != state.EntityID {
l.Debug("File %d entity changed, skipping owner change.", state.FileID)
return task.StatusCompleted, nil
}
indexer := dep.SearchIndexer(ctx)
if err := indexer.ChangeOwner(ctx, state.FileID, state.OriginalOwnerID, state.NewOwnerID); err != nil {
return task.StatusError, fmt.Errorf("failed to change owner for file %d: %w", state.FileID, err)
}
l.Debug("Successfully changed index owner for file %d from %d to %d.", state.FileID, state.OriginalOwnerID, state.NewOwnerID)
return task.StatusCompleted, nil
}
type (
FullTextDeleteTask struct {
*queue.DBTask
}
FullTextDeleteTaskState struct {
FileIDs []int `json:"file_ids"`
}
)
func NewFullTextDeleteTask(ctx context.Context, fileIDs []int, creator *ent.User) (*FullTextDeleteTask, error) {
state := &FullTextDeleteTaskState{
FileIDs: fileIDs,
}
stateBytes, err := json.Marshal(state)
if err != nil {
return nil, fmt.Errorf("failed to marshal state: %w", err)
}
return &FullTextDeleteTask{
DBTask: &queue.DBTask{
DirectOwner: creator,
Task: &ent.Task{
Type: queue.FullTextDeleteTaskType,
CorrelationID: logging.CorrelationID(ctx),
PrivateState: string(stateBytes),
PublicState: &types.TaskPublicState{},
},
},
}, nil
}
func NewFullTextDeleteTaskFromModel(t *ent.Task) queue.Task {
return &FullTextDeleteTask{
DBTask: &queue.DBTask{
Task: t,
},
}
}
func (t *FullTextDeleteTask) Do(ctx context.Context) (task.Status, error) {
dep := dependency.FromContext(ctx)
l := dep.Logger()
var state FullTextDeleteTaskState
if err := json.Unmarshal([]byte(t.State()), &state); err != nil {
return task.StatusError, fmt.Errorf("failed to unmarshal state: %s (%w)", err, queue.CriticalErr)
}
indexer := dep.SearchIndexer(ctx)
if err := indexer.DeleteByFileIDs(ctx, state.FileIDs...); err != nil {
return task.StatusError, fmt.Errorf("failed to delete index for %d file(s): %w", len(state.FileIDs), err)
}
l.Debug("Successfully deleted index for %d file(s).", len(state.FileIDs))
return task.StatusCompleted, nil
}
func (t *FullTextIndexTask) Do(ctx context.Context) (task.Status, error) {
dep := dependency.FromContext(ctx)
l := dep.Logger()
fm := NewFileManager(dep, inventory.UserFromContext(ctx)).(*manager)
// Check FTS enabled
if !fm.settings.FTSEnabled(ctx) {
l.Debug("FTS disabled, skipping full text index task.")
return task.StatusCompleted, nil
}
// Unmarshal state
var state FullTextIndexTaskState
if err := json.Unmarshal([]byte(t.State()), &state); err != nil {
return task.StatusError, fmt.Errorf("failed to unmarshal state: %s (%w)", err, queue.CriticalErr)
}
// Get fresh file to make sure task is not stale
file, err := fm.Get(ctx, state.Uri, dbfs.WithFilePublicMetadata())
if err != nil {
return task.StatusError, fmt.Errorf("failed to get latest file: %w", err)
}
if file.PrimaryEntityID() != state.EntityID {
l.Debug("File %d is not the latest version, skipping indexing.", state.FileID)
return task.StatusCompleted, nil
}
deleteOldChunks := false
if _, ok := file.Metadata()[dbfs.FullTextIndexKey]; ok {
deleteOldChunks = true
}
return performIndexing(ctx, fm, state.Uri, state.EntityID, state.FileID, state.OwnerID, state.Uri.Name(), deleteOldChunks)
}
// performIndexing extracts text from the entity and indexes it. This is shared between
// the regular index task and the copy task (as a fallback when copy fails).
func performIndexing(ctx context.Context, fm *manager, uri *fs.URI, entityID, fileID, ownerID int, fileName string, deleteOldChunks bool) (task.Status, error) {
dep := dependency.FromContext(ctx)
l := dep.Logger()
// Get entity source
source, err := fm.GetEntitySource(ctx, entityID)
if err != nil {
return task.StatusError, fmt.Errorf("failed to get entity source: %w", err)
}
defer source.Close()
// Extract text
var text string
if source.Entity().Size() > 0 {
extractor := dep.TextExtractor(ctx)
text, err = extractor.Extract(ctx, source)
if err != nil {
l.Warning("Failed to extract text for file %d: %s", fileID, err)
return task.StatusCompleted, nil
}
}
indexer := dep.SearchIndexer(ctx)
// Delete old chunks first so that stale chunks from a previously longer
// version of the file are removed before upserting the new (possibly fewer)
// chunks.
if deleteOldChunks {
if err := indexer.DeleteByFileIDs(ctx, fileID); err != nil {
l.Warning("Failed to delete old index chunks for file %d: %s", fileID, err)
}
}
// Index via SearchIndexer
if err := indexer.IndexFile(ctx, ownerID, fileID, entityID, fileName, text); err != nil {
return task.StatusError, fmt.Errorf("failed to index file %d: %w", fileID, err)
}
// Upsert metadata
if err := fm.fs.PatchMetadata(ctx, []*fs.URI{uri}, fs.MetadataPatch{
Key: dbfs.FullTextIndexKey,
Value: hashid.EncodeEntityID(fm.hasher, entityID),
}); err != nil {
return task.StatusError, fmt.Errorf("failed to patch metadata: %w", err)
}
l.Debug("Successfully indexed file %d for owner %d.", fileID, ownerID)
return task.StatusCompleted, nil
}
// ShouldExtractText checks if a file is eligible for text extraction based on
// the extractor's supported extensions and max file size. This is exported for
// use by the rebuild index workflow.
func ShouldExtractText(extractor searcher.TextExtractor, fileName string, size int64) bool {
return util.IsInExtensionList(extractor.Exts(), fileName) && extractor.MaxFileSize() > size
}
// shouldIndexFullText checks if a file should be indexed for full-text search.
func (m *manager) shouldIndexFullText(ctx context.Context, fileName string, size int64) bool {
if !m.settings.FTSEnabled(ctx) {
return false
}
extractor := m.dep.TextExtractor(ctx)
return ShouldExtractText(extractor, fileName, size)
}
// fullTextIndexForNewEntity creates and queues a full text index task for a newly uploaded entity.
func (m *manager) fullTextIndexForNewEntity(ctx context.Context, session *fs.UploadSession, owner int) {
if session.Props.EntityType != nil && *session.Props.EntityType != types.EntityTypeVersion {
return
}
if !m.shouldIndexFullText(ctx, session.Props.Uri.Name(), session.Props.Size) {
return
}
t, err := NewFullTextIndexTask(ctx, session.Props.Uri, session.EntityID, session.FileID, owner, m.user)
if err != nil {
m.l.Warning("Failed to create full text index task: %s", err)
return
}
if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, t); err != nil {
m.l.Warning("Failed to queue full text index task: %s", err)
}
}
func (m *manager) processIndexDiff(ctx context.Context, diff *fs.IndexDiff) {
if diff == nil {
return
}
for _, update := range diff.IndexToUpdate {
t, err := NewFullTextIndexTask(ctx, &update.Uri, update.EntityID, update.FileID, update.OwnerID, m.user)
if err != nil {
m.l.Warning("Failed to create full text update task: %s", err)
continue
}
if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, t); err != nil {
m.l.Warning("Failed to queue full text update task: %s", err)
}
}
for _, cp := range diff.IndexToCopy {
t, err := NewFullTextCopyTask(ctx, &cp.Uri, cp.OriginalFileID, cp.FileID, cp.OwnerID, cp.EntityID, m.user)
if err != nil {
m.l.Warning("Failed to create full text copy task: %s", err)
continue
}
if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, t); err != nil {
m.l.Warning("Failed to queue full text copy task: %s", err)
}
}
for _, change := range diff.IndexToChangeOwner {
t, err := NewFullTextChangeOwnerTask(ctx, &change.Uri, change.EntityID, change.FileID, change.OriginalOwnerID, change.NewOwnerID, m.user)
if err != nil {
m.l.Warning("Failed to create full text change owner task: %s", err)
continue
}
if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, t); err != nil {
m.l.Warning("Failed to queue full text change owner task: %s", err)
}
}
if len(diff.IndexToDelete) > 0 && m.dep.SettingProvider().FTSEnabled(ctx) {
t, err := NewFullTextDeleteTask(ctx, diff.IndexToDelete, m.user)
if err != nil {
m.l.Warning("Failed to create full text delete task: %s", err)
return
}
if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, t); err != nil {
m.l.Warning("Failed to queue full text delete task: %s", err)
}
}
ctx = context.WithoutCancel(ctx)
indexer := m.dep.SearchIndexer(ctx)
go func() {
for _, rename := range diff.IndexToRename {
if err := indexer.Rename(ctx, rename.FileID, rename.EntityID, rename.Uri.Name()); err != nil {
m.l.Warning("Failed to rename index for file %d: %s", rename.FileID, err)
}
}
}()
}