You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
537 lines
16 KiB
537 lines
16 KiB
package manager
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
"github.com/cloudreve/Cloudreve/v4/application/dependency"
|
|
"github.com/cloudreve/Cloudreve/v4/ent"
|
|
"github.com/cloudreve/Cloudreve/v4/ent/task"
|
|
"github.com/cloudreve/Cloudreve/v4/inventory"
|
|
"github.com/cloudreve/Cloudreve/v4/inventory/types"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs/dbfs"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/hashid"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/searcher"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/util"
|
|
"github.com/samber/lo"
|
|
)
|
|
|
|
type (
|
|
FullTextIndexTask struct {
|
|
*queue.DBTask
|
|
}
|
|
|
|
FullTextIndexTaskState struct {
|
|
Uri *fs.URI `json:"uri"`
|
|
EntityID int `json:"entity_id"`
|
|
FileID int `json:"file_id"`
|
|
OwnerID int `json:"owner_id"`
|
|
}
|
|
|
|
ftsFileInfo struct {
|
|
FileID int
|
|
OwnerID int
|
|
EntityID int
|
|
FileName string
|
|
}
|
|
)
|
|
|
|
func (m *manager) SearchFullText(ctx context.Context, query string, offset int) (*FullTextSearchResults, error) {
|
|
indexer := m.dep.SearchIndexer(ctx)
|
|
results, total, err := indexer.Search(ctx, m.user.ID, query, offset)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to search full text: %w", err)
|
|
}
|
|
|
|
if len(results) == 0 {
|
|
// No results.
|
|
return &FullTextSearchResults{}, nil
|
|
}
|
|
|
|
// Traverse each file in result
|
|
files := lo.FilterMap(results, func(result searcher.SearchResult, _ int) (FullTextSearchResult, bool) {
|
|
file, err := m.TraverseFile(ctx, result.FileID)
|
|
if err != nil {
|
|
m.l.Debug("Failed to traverse file %d for full text search: %s, skipping.", result.FileID, err)
|
|
return FullTextSearchResult{}, false
|
|
}
|
|
|
|
return FullTextSearchResult{
|
|
File: file,
|
|
Content: result.Text,
|
|
}, true
|
|
})
|
|
|
|
if len(files) == 0 {
|
|
// No valid files, run next offset
|
|
return m.SearchFullText(ctx, query, offset+len(results))
|
|
}
|
|
|
|
return &FullTextSearchResults{
|
|
Hits: files,
|
|
Total: total,
|
|
}, nil
|
|
}
|
|
|
|
func init() {
|
|
queue.RegisterResumableTaskFactory(queue.FullTextIndexTaskType, NewFullTextIndexTaskFromModel)
|
|
queue.RegisterResumableTaskFactory(queue.FullTextCopyTaskType, NewFullTextCopyTaskFromModel)
|
|
queue.RegisterResumableTaskFactory(queue.FullTextChangeOwnerTaskType, NewFullTextChangeOwnerTaskFromModel)
|
|
queue.RegisterResumableTaskFactory(queue.FullTextDeleteTaskType, NewFullTextDeleteTaskFromModel)
|
|
}
|
|
|
|
func NewFullTextIndexTask(ctx context.Context, uri *fs.URI, entityID, fileID, ownerID int, creator *ent.User) (*FullTextIndexTask, error) {
|
|
state := &FullTextIndexTaskState{
|
|
Uri: uri,
|
|
EntityID: entityID,
|
|
FileID: fileID,
|
|
OwnerID: ownerID,
|
|
}
|
|
stateBytes, err := json.Marshal(state)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal state: %w", err)
|
|
}
|
|
|
|
return &FullTextIndexTask{
|
|
DBTask: &queue.DBTask{
|
|
DirectOwner: creator,
|
|
Task: &ent.Task{
|
|
Type: queue.FullTextIndexTaskType,
|
|
CorrelationID: logging.CorrelationID(ctx),
|
|
PrivateState: string(stateBytes),
|
|
PublicState: &types.TaskPublicState{},
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func NewFullTextIndexTaskFromModel(t *ent.Task) queue.Task {
|
|
return &FullTextIndexTask{
|
|
DBTask: &queue.DBTask{
|
|
Task: t,
|
|
},
|
|
}
|
|
}
|
|
|
|
type (
|
|
FullTextCopyTask struct {
|
|
*queue.DBTask
|
|
}
|
|
|
|
FullTextCopyTaskState struct {
|
|
Uri *fs.URI `json:"uri"`
|
|
OriginalFileID int `json:"original_file_id"`
|
|
FileID int `json:"file_id"`
|
|
OwnerID int `json:"owner_id"`
|
|
EntityID int `json:"entity_id"`
|
|
}
|
|
)
|
|
|
|
func NewFullTextCopyTask(ctx context.Context, uri *fs.URI, originalFileID, fileID, ownerID, entityID int, creator *ent.User) (*FullTextCopyTask, error) {
|
|
state := &FullTextCopyTaskState{
|
|
Uri: uri,
|
|
OriginalFileID: originalFileID,
|
|
FileID: fileID,
|
|
OwnerID: ownerID,
|
|
EntityID: entityID,
|
|
}
|
|
stateBytes, err := json.Marshal(state)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal state: %w", err)
|
|
}
|
|
|
|
return &FullTextCopyTask{
|
|
DBTask: &queue.DBTask{
|
|
DirectOwner: creator,
|
|
Task: &ent.Task{
|
|
Type: queue.FullTextCopyTaskType,
|
|
CorrelationID: logging.CorrelationID(ctx),
|
|
PrivateState: string(stateBytes),
|
|
PublicState: &types.TaskPublicState{},
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func NewFullTextCopyTaskFromModel(t *ent.Task) queue.Task {
|
|
return &FullTextCopyTask{
|
|
DBTask: &queue.DBTask{
|
|
Task: t,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (t *FullTextCopyTask) Do(ctx context.Context) (task.Status, error) {
|
|
dep := dependency.FromContext(ctx)
|
|
l := dep.Logger()
|
|
fm := NewFileManager(dep, inventory.UserFromContext(ctx)).(*manager)
|
|
|
|
if !fm.settings.FTSEnabled(ctx) {
|
|
l.Debug("FTS disabled, skipping full text copy task.")
|
|
return task.StatusCompleted, nil
|
|
}
|
|
|
|
var state FullTextCopyTaskState
|
|
if err := json.Unmarshal([]byte(t.State()), &state); err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to unmarshal state: %s (%w)", err, queue.CriticalErr)
|
|
}
|
|
|
|
// Get fresh file to make sure task is not stale.
|
|
file, err := fm.Get(ctx, state.Uri, dbfs.WithFilePublicMetadata())
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to get latest file: %w", err)
|
|
}
|
|
|
|
if file.PrimaryEntityID() != state.EntityID {
|
|
l.Debug("File %d entity changed, skipping copy index.", state.FileID)
|
|
return task.StatusCompleted, nil
|
|
}
|
|
|
|
indexer := dep.SearchIndexer(ctx)
|
|
if err := indexer.CopyByFileID(ctx, state.OriginalFileID, state.FileID, state.OwnerID, state.EntityID); err != nil {
|
|
l.Warning("Failed to copy index from file %d to %d, falling back to full indexing: %s", state.OriginalFileID, state.FileID, err)
|
|
return performIndexing(ctx, fm, state.Uri, state.EntityID, state.FileID, state.OwnerID, file.Name(), false)
|
|
}
|
|
|
|
// Patch metadata to mark file as indexed.
|
|
if err := fm.fs.PatchMetadata(ctx, []*fs.URI{state.Uri}, fs.MetadataPatch{
|
|
Key: dbfs.FullTextIndexKey,
|
|
Value: hashid.EncodeEntityID(fm.hasher, state.EntityID),
|
|
}); err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to patch metadata: %w", err)
|
|
}
|
|
|
|
l.Debug("Successfully copied index from file %d to %d.", state.OriginalFileID, state.FileID)
|
|
return task.StatusCompleted, nil
|
|
}
|
|
|
|
type (
|
|
FullTextChangeOwnerTask struct {
|
|
*queue.DBTask
|
|
}
|
|
|
|
FullTextChangeOwnerTaskState struct {
|
|
Uri *fs.URI `json:"uri"`
|
|
EntityID int `json:"entity_id"`
|
|
FileID int `json:"file_id"`
|
|
OriginalOwnerID int `json:"original_owner_id"`
|
|
NewOwnerID int `json:"new_owner_id"`
|
|
}
|
|
)
|
|
|
|
func NewFullTextChangeOwnerTask(ctx context.Context, uri *fs.URI, entityID, fileID, originalOwnerID, newOwnerID int, creator *ent.User) (*FullTextChangeOwnerTask, error) {
|
|
state := &FullTextChangeOwnerTaskState{
|
|
Uri: uri,
|
|
EntityID: entityID,
|
|
FileID: fileID,
|
|
OriginalOwnerID: originalOwnerID,
|
|
NewOwnerID: newOwnerID,
|
|
}
|
|
stateBytes, err := json.Marshal(state)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal state: %w", err)
|
|
}
|
|
|
|
return &FullTextChangeOwnerTask{
|
|
DBTask: &queue.DBTask{
|
|
DirectOwner: creator,
|
|
Task: &ent.Task{
|
|
Type: queue.FullTextChangeOwnerTaskType,
|
|
CorrelationID: logging.CorrelationID(ctx),
|
|
PrivateState: string(stateBytes),
|
|
PublicState: &types.TaskPublicState{},
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func NewFullTextChangeOwnerTaskFromModel(t *ent.Task) queue.Task {
|
|
return &FullTextChangeOwnerTask{
|
|
DBTask: &queue.DBTask{
|
|
Task: t,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (t *FullTextChangeOwnerTask) Do(ctx context.Context) (task.Status, error) {
|
|
dep := dependency.FromContext(ctx)
|
|
l := dep.Logger()
|
|
fm := NewFileManager(dep, inventory.UserFromContext(ctx)).(*manager)
|
|
|
|
if !fm.settings.FTSEnabled(ctx) {
|
|
l.Debug("FTS disabled, skipping full text change owner task.")
|
|
return task.StatusCompleted, nil
|
|
}
|
|
|
|
var state FullTextChangeOwnerTaskState
|
|
if err := json.Unmarshal([]byte(t.State()), &state); err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to unmarshal state: %s (%w)", err, queue.CriticalErr)
|
|
}
|
|
|
|
// Get fresh file to make sure task is not stale.
|
|
file, err := fm.Get(ctx, state.Uri, dbfs.WithFilePublicMetadata())
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to get latest file: %w", err)
|
|
}
|
|
|
|
if file.PrimaryEntityID() != state.EntityID {
|
|
l.Debug("File %d entity changed, skipping owner change.", state.FileID)
|
|
return task.StatusCompleted, nil
|
|
}
|
|
|
|
indexer := dep.SearchIndexer(ctx)
|
|
if err := indexer.ChangeOwner(ctx, state.FileID, state.OriginalOwnerID, state.NewOwnerID); err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to change owner for file %d: %w", state.FileID, err)
|
|
}
|
|
|
|
l.Debug("Successfully changed index owner for file %d from %d to %d.", state.FileID, state.OriginalOwnerID, state.NewOwnerID)
|
|
return task.StatusCompleted, nil
|
|
}
|
|
|
|
type (
|
|
FullTextDeleteTask struct {
|
|
*queue.DBTask
|
|
}
|
|
|
|
FullTextDeleteTaskState struct {
|
|
FileIDs []int `json:"file_ids"`
|
|
}
|
|
)
|
|
|
|
func NewFullTextDeleteTask(ctx context.Context, fileIDs []int, creator *ent.User) (*FullTextDeleteTask, error) {
|
|
state := &FullTextDeleteTaskState{
|
|
FileIDs: fileIDs,
|
|
}
|
|
stateBytes, err := json.Marshal(state)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal state: %w", err)
|
|
}
|
|
|
|
return &FullTextDeleteTask{
|
|
DBTask: &queue.DBTask{
|
|
DirectOwner: creator,
|
|
Task: &ent.Task{
|
|
Type: queue.FullTextDeleteTaskType,
|
|
CorrelationID: logging.CorrelationID(ctx),
|
|
PrivateState: string(stateBytes),
|
|
PublicState: &types.TaskPublicState{},
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func NewFullTextDeleteTaskFromModel(t *ent.Task) queue.Task {
|
|
return &FullTextDeleteTask{
|
|
DBTask: &queue.DBTask{
|
|
Task: t,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (t *FullTextDeleteTask) Do(ctx context.Context) (task.Status, error) {
|
|
dep := dependency.FromContext(ctx)
|
|
l := dep.Logger()
|
|
|
|
var state FullTextDeleteTaskState
|
|
if err := json.Unmarshal([]byte(t.State()), &state); err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to unmarshal state: %s (%w)", err, queue.CriticalErr)
|
|
}
|
|
|
|
indexer := dep.SearchIndexer(ctx)
|
|
if err := indexer.DeleteByFileIDs(ctx, state.FileIDs...); err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to delete index for %d file(s): %w", len(state.FileIDs), err)
|
|
}
|
|
|
|
l.Debug("Successfully deleted index for %d file(s).", len(state.FileIDs))
|
|
return task.StatusCompleted, nil
|
|
}
|
|
|
|
func (t *FullTextIndexTask) Do(ctx context.Context) (task.Status, error) {
|
|
dep := dependency.FromContext(ctx)
|
|
l := dep.Logger()
|
|
fm := NewFileManager(dep, inventory.UserFromContext(ctx)).(*manager)
|
|
|
|
// Check FTS enabled
|
|
if !fm.settings.FTSEnabled(ctx) {
|
|
l.Debug("FTS disabled, skipping full text index task.")
|
|
return task.StatusCompleted, nil
|
|
}
|
|
|
|
// Unmarshal state
|
|
var state FullTextIndexTaskState
|
|
if err := json.Unmarshal([]byte(t.State()), &state); err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to unmarshal state: %s (%w)", err, queue.CriticalErr)
|
|
}
|
|
|
|
// Get fresh file to make sure task is not stale
|
|
file, err := fm.Get(ctx, state.Uri, dbfs.WithFilePublicMetadata())
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to get latest file: %w", err)
|
|
}
|
|
|
|
if file.PrimaryEntityID() != state.EntityID {
|
|
l.Debug("File %d is not the latest version, skipping indexing.", state.FileID)
|
|
return task.StatusCompleted, nil
|
|
}
|
|
|
|
deleteOldChunks := false
|
|
if _, ok := file.Metadata()[dbfs.FullTextIndexKey]; ok {
|
|
deleteOldChunks = true
|
|
}
|
|
|
|
return performIndexing(ctx, fm, state.Uri, state.EntityID, state.FileID, state.OwnerID, state.Uri.Name(), deleteOldChunks)
|
|
}
|
|
|
|
// performIndexing extracts text from the entity and indexes it. This is shared between
|
|
// the regular index task and the copy task (as a fallback when copy fails).
|
|
func performIndexing(ctx context.Context, fm *manager, uri *fs.URI, entityID, fileID, ownerID int, fileName string, deleteOldChunks bool) (task.Status, error) {
|
|
dep := dependency.FromContext(ctx)
|
|
l := dep.Logger()
|
|
|
|
// Get entity source
|
|
source, err := fm.GetEntitySource(ctx, entityID)
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to get entity source: %w", err)
|
|
}
|
|
defer source.Close()
|
|
|
|
// Extract text
|
|
var text string
|
|
if source.Entity().Size() > 0 {
|
|
extractor := dep.TextExtractor(ctx)
|
|
text, err = extractor.Extract(ctx, source)
|
|
if err != nil {
|
|
l.Warning("Failed to extract text for file %d: %s", fileID, err)
|
|
return task.StatusCompleted, nil
|
|
}
|
|
}
|
|
|
|
indexer := dep.SearchIndexer(ctx)
|
|
|
|
// Delete old chunks first so that stale chunks from a previously longer
|
|
// version of the file are removed before upserting the new (possibly fewer)
|
|
// chunks.
|
|
if deleteOldChunks {
|
|
if err := indexer.DeleteByFileIDs(ctx, fileID); err != nil {
|
|
l.Warning("Failed to delete old index chunks for file %d: %s", fileID, err)
|
|
}
|
|
}
|
|
|
|
// Index via SearchIndexer
|
|
if err := indexer.IndexFile(ctx, ownerID, fileID, entityID, fileName, text); err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to index file %d: %w", fileID, err)
|
|
}
|
|
|
|
// Upsert metadata
|
|
if err := fm.fs.PatchMetadata(ctx, []*fs.URI{uri}, fs.MetadataPatch{
|
|
Key: dbfs.FullTextIndexKey,
|
|
Value: hashid.EncodeEntityID(fm.hasher, entityID),
|
|
}); err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to patch metadata: %w", err)
|
|
}
|
|
|
|
l.Debug("Successfully indexed file %d for owner %d.", fileID, ownerID)
|
|
return task.StatusCompleted, nil
|
|
}
|
|
|
|
// ShouldExtractText checks if a file is eligible for text extraction based on
|
|
// the extractor's supported extensions and max file size. This is exported for
|
|
// use by the rebuild index workflow.
|
|
func ShouldExtractText(extractor searcher.TextExtractor, fileName string, size int64) bool {
|
|
return util.IsInExtensionList(extractor.Exts(), fileName) && extractor.MaxFileSize() > size
|
|
}
|
|
|
|
// shouldIndexFullText checks if a file should be indexed for full-text search.
|
|
func (m *manager) shouldIndexFullText(ctx context.Context, fileName string, size int64) bool {
|
|
if !m.settings.FTSEnabled(ctx) {
|
|
return false
|
|
}
|
|
|
|
extractor := m.dep.TextExtractor(ctx)
|
|
return ShouldExtractText(extractor, fileName, size)
|
|
}
|
|
|
|
// fullTextIndexForNewEntity creates and queues a full text index task for a newly uploaded entity.
|
|
func (m *manager) fullTextIndexForNewEntity(ctx context.Context, session *fs.UploadSession, owner int) {
|
|
if session.Props.EntityType != nil && *session.Props.EntityType != types.EntityTypeVersion {
|
|
return
|
|
}
|
|
|
|
if !m.shouldIndexFullText(ctx, session.Props.Uri.Name(), session.Props.Size) {
|
|
return
|
|
}
|
|
|
|
t, err := NewFullTextIndexTask(ctx, session.Props.Uri, session.EntityID, session.FileID, owner, m.user)
|
|
if err != nil {
|
|
m.l.Warning("Failed to create full text index task: %s", err)
|
|
return
|
|
}
|
|
if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, t); err != nil {
|
|
m.l.Warning("Failed to queue full text index task: %s", err)
|
|
}
|
|
}
|
|
|
|
func (m *manager) processIndexDiff(ctx context.Context, diff *fs.IndexDiff) {
|
|
if diff == nil {
|
|
return
|
|
}
|
|
|
|
for _, update := range diff.IndexToUpdate {
|
|
t, err := NewFullTextIndexTask(ctx, &update.Uri, update.EntityID, update.FileID, update.OwnerID, m.user)
|
|
if err != nil {
|
|
m.l.Warning("Failed to create full text update task: %s", err)
|
|
continue
|
|
}
|
|
if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, t); err != nil {
|
|
m.l.Warning("Failed to queue full text update task: %s", err)
|
|
}
|
|
}
|
|
|
|
for _, cp := range diff.IndexToCopy {
|
|
t, err := NewFullTextCopyTask(ctx, &cp.Uri, cp.OriginalFileID, cp.FileID, cp.OwnerID, cp.EntityID, m.user)
|
|
if err != nil {
|
|
m.l.Warning("Failed to create full text copy task: %s", err)
|
|
continue
|
|
}
|
|
if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, t); err != nil {
|
|
m.l.Warning("Failed to queue full text copy task: %s", err)
|
|
}
|
|
}
|
|
|
|
for _, change := range diff.IndexToChangeOwner {
|
|
t, err := NewFullTextChangeOwnerTask(ctx, &change.Uri, change.EntityID, change.FileID, change.OriginalOwnerID, change.NewOwnerID, m.user)
|
|
if err != nil {
|
|
m.l.Warning("Failed to create full text change owner task: %s", err)
|
|
continue
|
|
}
|
|
if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, t); err != nil {
|
|
m.l.Warning("Failed to queue full text change owner task: %s", err)
|
|
}
|
|
}
|
|
|
|
if len(diff.IndexToDelete) > 0 && m.dep.SettingProvider().FTSEnabled(ctx) {
|
|
t, err := NewFullTextDeleteTask(ctx, diff.IndexToDelete, m.user)
|
|
if err != nil {
|
|
m.l.Warning("Failed to create full text delete task: %s", err)
|
|
return
|
|
}
|
|
if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, t); err != nil {
|
|
m.l.Warning("Failed to queue full text delete task: %s", err)
|
|
}
|
|
}
|
|
|
|
ctx = context.WithoutCancel(ctx)
|
|
indexer := m.dep.SearchIndexer(ctx)
|
|
go func() {
|
|
for _, rename := range diff.IndexToRename {
|
|
if err := indexer.Rename(ctx, rename.FileID, rename.EntityID, rename.Uri.Name()); err != nil {
|
|
m.l.Warning("Failed to rename index for file %d: %s", rename.FileID, err)
|
|
}
|
|
}
|
|
}()
|
|
}
|