diff --git a/application/dependency/dependency.go b/application/dependency/dependency.go index aaab6e18..d9963780 100644 --- a/application/dependency/dependency.go +++ b/application/dependency/dependency.go @@ -648,6 +648,7 @@ func (d *dependency) MediaMetaQueue(ctx context.Context) queue.Queue { queue.MediaMetaTaskType, queue.FullTextIndexTaskType, queue.FullTextDeleteTaskType, + queue.FullTextRebuildTaskType, queue.FullTextCopyTaskType, queue.FullTextChangeOwnerTaskType, ), diff --git a/assets b/assets index 4a38a946..e3e7fd33 160000 --- a/assets +++ b/assets @@ -1 +1 @@ -Subproject commit 4a38a946cb40c3fe9878af0945c33a8b6987637b +Subproject commit e3e7fd331c869ea67393a5ac5d96ee9fbd8e30ab diff --git a/inventory/file.go b/inventory/file.go index ed7047aa..3b344bd9 100644 --- a/inventory/file.go +++ b/inventory/file.go @@ -209,6 +209,11 @@ type FileClient interface { UnlinkEntity(ctx context.Context, entity *ent.Entity, file *ent.File, owner *ent.User) (StorageDiff, error) // CreateDirectLink creates a direct link for a file CreateDirectLink(ctx context.Context, fileID int, name string, speed int, reuse bool) (*ent.DirectLink, error) + // CountIndexableFiles counts files suitable for FTS indexing (non-empty name, has parent, is file type). + CountIndexableFiles(ctx context.Context) (int, error) + // ListIndexableFiles lists files suitable for FTS indexing, returning up to limit files + // with ID strictly greater than afterID. Use afterID=0 to start from the beginning. + ListIndexableFiles(ctx context.Context, afterID, limit int) ([]*ent.File, error) // CountByTimeRange counts files created in a given time range CountByTimeRange(ctx context.Context, start, end *time.Time) (int, error) // CountEntityByTimeRange counts entities created in a given time range @@ -229,6 +234,8 @@ type FileClient interface { UpdateProps(ctx context.Context, file *ent.File, props *types.FileProps) (*ent.File, error) // UpdateModifiedAt updates modified at of a file UpdateModifiedAt(ctx context.Context, file *ent.File, modifiedAt time.Time) error + // DeleteAllMetadataByName deletes all metadata by a given name + DeleteAllMetadataByName(ctx context.Context, name string) error } func NewFileClient(client *ent.Client, dbType conf.DBType, hasher hashid.Encoder) FileClient { @@ -314,6 +321,36 @@ func (f *fileClient) CountByTimeRange(ctx context.Context, start, end *time.Time return f.client.File.Query().Where(file.CreatedAtGTE(*start), file.CreatedAtLT(*end)).Count(ctx) } +func (f *fileClient) DeleteAllMetadataByName(ctx context.Context, name string) error { + _, err := f.client.Metadata.Delete().Where(metadata.Name(name)).Exec(schema.SkipSoftDelete(ctx)) + if err != nil { + return fmt.Errorf("failed to delete metadata: %w", err) + } + + return nil +} + +func (f *fileClient) CountIndexableFiles(ctx context.Context) (int, error) { + return f.indexableFilesQuery().Count(ctx) +} + +func (f *fileClient) ListIndexableFiles(ctx context.Context, afterID, limit int) ([]*ent.File, error) { + q := f.indexableFilesQuery() + if afterID > 0 { + q = q.Where(file.IDGT(afterID)) + } + return q.Limit(limit).All(ctx) +} + +func (f *fileClient) indexableFilesQuery() *ent.FileQuery { + return f.client.File.Query().Where( + file.Type(int(types.FileTypeFile)), + file.NameNEQ(""), + file.SizeGT(0), + file.FileChildrenNotNil(), + ).Order(file.ByID()) +} + func (f *fileClient) CountEntityByTimeRange(ctx context.Context, start, end *time.Time) (int, error) { if start == nil || end == nil { return f.client.Entity.Query().Count(ctx) diff --git a/inventory/setting.go b/inventory/setting.go index 65b70765..f3f8d4b4 100644 --- a/inventory/setting.go +++ b/inventory/setting.go @@ -673,15 +673,14 @@ var DefaultSettings = map[string]string{ "fs_event_push_max_age": "1209600", "fs_event_push_debounce": "5", "fts_enabled": "0", - "fts_index_type": "", - "fts_extractor_type": "", + "fts_index_type": "meilisearch", + "fts_extractor_type": "tika", "fts_meilisearch_endpoint": "", "fts_meilisearch_api_key": "", "fts_meilisearch_page_size": "5", "fts_meilisearch_embed_enabled": "0", "fts_meilisearch_embed_config": "{}", "fts_tika_endpoint": "", - "fts_tika_max_response_size": "10485760", "fts_tika_exts": "pdf,doc,docx,xls,xlsx,ppt,pptx,odt,ods,odp,rtf,txt,md,html,htm,epub,csv", "fts_tika_max_file_size": "26214400", "fts_chunk_size": "2000", diff --git a/pkg/filemanager/manager/fulltextindex.go b/pkg/filemanager/manager/fulltextindex.go index 4c6e8d91..abbfe4f3 100644 --- a/pkg/filemanager/manager/fulltextindex.go +++ b/pkg/filemanager/manager/fulltextindex.go @@ -389,7 +389,7 @@ func (t *FullTextIndexTask) Do(ctx context.Context) (task.Status, error) { // performIndexing extracts text from the entity and indexes it. This is shared between // the regular index task and the copy task (as a fallback when copy fails). func performIndexing(ctx context.Context, fm *manager, uri *fs.URI, entityID, fileID, ownerID int, fileName string, deleteOldChunks bool) (task.Status, error) { - dep := fm.dep + dep := dependency.FromContext(ctx) l := dep.Logger() // Get entity source @@ -438,6 +438,13 @@ func performIndexing(ctx context.Context, fm *manager, uri *fs.URI, entityID, fi return task.StatusCompleted, nil } +// ShouldExtractText checks if a file is eligible for text extraction based on +// the extractor's supported extensions and max file size. This is exported for +// use by the rebuild index workflow. +func ShouldExtractText(extractor searcher.TextExtractor, fileName string, size int64) bool { + return util.IsInExtensionList(extractor.Exts(), fileName) && extractor.MaxFileSize() > size +} + // shouldIndexFullText checks if a file should be indexed for full-text search. func (m *manager) shouldIndexFullText(ctx context.Context, fileName string, size int64) bool { if !m.settings.FTSEnabled(ctx) { @@ -445,7 +452,7 @@ func (m *manager) shouldIndexFullText(ctx context.Context, fileName string, size } extractor := m.dep.TextExtractor(ctx) - return util.IsInExtensionList(extractor.Exts(), fileName) && extractor.MaxFileSize() > size + return ShouldExtractText(extractor, fileName, size) } // fullTextIndexForNewEntity creates and queues a full text index task for a newly uploaded entity. diff --git a/pkg/filemanager/workflows/rebuild_index.go b/pkg/filemanager/workflows/rebuild_index.go new file mode 100644 index 00000000..8049ed66 --- /dev/null +++ b/pkg/filemanager/workflows/rebuild_index.go @@ -0,0 +1,311 @@ +package workflows + +import ( + "context" + "encoding/json" + "fmt" + "slices" + "sync" + "sync/atomic" + + "github.com/cloudreve/Cloudreve/v4/application/dependency" + "github.com/cloudreve/Cloudreve/v4/ent" + "github.com/cloudreve/Cloudreve/v4/ent/task" + "github.com/cloudreve/Cloudreve/v4/inventory" + "github.com/cloudreve/Cloudreve/v4/inventory/types" + "github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs/dbfs" + "github.com/cloudreve/Cloudreve/v4/pkg/filemanager/manager" + "github.com/cloudreve/Cloudreve/v4/pkg/hashid" + "github.com/cloudreve/Cloudreve/v4/pkg/logging" + "github.com/cloudreve/Cloudreve/v4/pkg/queue" + "github.com/cloudreve/Cloudreve/v4/pkg/searcher" +) + +type ( + RebuildIndexTask struct { + *queue.DBTask + + l logging.Logger + state *RebuildIndexTaskState + progress queue.Progresses + } + RebuildIndexTaskPhase string + RebuildIndexTaskState struct { + Phase RebuildIndexTaskPhase `json:"phase"` + Total int `json:"total"` + Indexed int `json:"indexed"` + LastFileID int `json:"last_file_id"` + Failed int `json:"failed"` + FilteredStoragePolicy []int `json:"filtered_storage_policy"` + } +) + +const ( + RebuildIndexPhaseNuke RebuildIndexTaskPhase = "nuke" + RebuildIndexPhaseIndex RebuildIndexTaskPhase = "index" + + RebuildIndexBatchSize = 1000 + RebuildIndexConcurrent = 4 + + ProgressTypeRebuildIndex = "rebuild_index" +) + +func init() { + queue.RegisterResumableTaskFactory(queue.FullTextRebuildTaskType, NewRebuildIndexTaskFromModel) +} + +func NewRebuildIndexTask(ctx context.Context, u *ent.User, filteredStoragePolicy []int) (queue.Task, error) { + state := &RebuildIndexTaskState{ + Phase: RebuildIndexPhaseNuke, + FilteredStoragePolicy: filteredStoragePolicy, + } + stateBytes, err := json.Marshal(state) + if err != nil { + return nil, fmt.Errorf("failed to marshal state: %w", err) + } + + return &RebuildIndexTask{ + DBTask: &queue.DBTask{ + Task: &ent.Task{ + Type: queue.FullTextRebuildTaskType, + CorrelationID: logging.CorrelationID(ctx), + PrivateState: string(stateBytes), + PublicState: &types.TaskPublicState{}, + }, + DirectOwner: u, + }, + }, nil +} + +func NewRebuildIndexTaskFromModel(t *ent.Task) queue.Task { + return &RebuildIndexTask{ + DBTask: &queue.DBTask{ + Task: t, + }, + } +} + +func (m *RebuildIndexTask) Do(ctx context.Context) (task.Status, error) { + dep := dependency.FromContext(ctx) + m.l = dep.Logger() + + m.Lock() + if m.progress == nil { + m.progress = make(queue.Progresses) + } + m.progress[ProgressTypeRebuildIndex] = &queue.Progress{} + m.Unlock() + + state := &RebuildIndexTaskState{} + if err := json.Unmarshal([]byte(m.State()), state); err != nil { + return task.StatusError, fmt.Errorf("failed to unmarshal state: %s (%w)", err, queue.CriticalErr) + } + m.state = state + + var ( + next = task.StatusCompleted + err error + ) + switch m.state.Phase { + case RebuildIndexPhaseNuke, "": + next, err = m.nuke(ctx, dep) + case RebuildIndexPhaseIndex: + next, err = m.index(ctx, dep) + default: + next, err = task.StatusError, fmt.Errorf("unknown phase %q: %w", m.state.Phase, queue.CriticalErr) + } + + newStateStr, marshalErr := json.Marshal(m.state) + if marshalErr != nil { + return task.StatusError, fmt.Errorf("failed to marshal state: %w", marshalErr) + } + + m.Lock() + m.Task.PrivateState = string(newStateStr) + m.Unlock() + return next, err +} + +// nuke deletes all existing index documents and ensures a fresh index exists, +// then counts total indexable files for progress tracking. +func (m *RebuildIndexTask) nuke(ctx context.Context, dep dependency.Dep) (task.Status, error) { + indexer := dep.SearchIndexer(ctx) + + m.l.Info("Deleting all existing index documents...") + if err := indexer.DeleteAll(ctx); err != nil { + return task.StatusError, fmt.Errorf("failed to delete all index documents: %w", err) + } + + if err := dep.FileClient().DeleteAllMetadataByName(ctx, dbfs.FullTextIndexKey); err != nil { + return task.StatusError, fmt.Errorf("failed to delete all metadata by name: %w", err) + } + + m.l.Info("Ensuring index exists with correct configuration...") + if err := indexer.EnsureIndex(ctx); err != nil { + return task.StatusError, fmt.Errorf("failed to ensure index: %w", err) + } + + // Count total indexable files + total, err := dep.FileClient().CountIndexableFiles(ctx) + if err != nil { + return task.StatusError, fmt.Errorf("failed to count indexable files: %w", err) + } + + m.state.Total = total + m.state.Phase = RebuildIndexPhaseIndex + m.state.LastFileID = 0 + m.state.Indexed = 0 + + m.l.Info("Found %d indexable files, starting rebuild...", total) + m.ResumeAfter(0) + return task.StatusSuspending, nil +} + +// index processes a batch of files and suspends for the next batch. +func (m *RebuildIndexTask) index(ctx context.Context, dep dependency.Dep) (task.Status, error) { + atomic.StoreInt64(&m.progress[ProgressTypeRebuildIndex].Total, int64(m.state.Total)) + atomic.StoreInt64(&m.progress[ProgressTypeRebuildIndex].Current, int64(m.state.Indexed)) + + files, err := dep.FileClient().ListIndexableFiles(ctx, m.state.LastFileID, RebuildIndexBatchSize) + if err != nil { + return task.StatusError, fmt.Errorf("failed to list indexable files after ID %d: %w", m.state.LastFileID, err) + } + + if len(files) == 0 { + m.l.Info("Rebuild complete. %d files indexed, %d failed.", m.state.Indexed-m.state.Failed, m.state.Failed) + return task.StatusCompleted, nil + } + + batchFailed := m.processBatch(ctx, dep, files) + m.state.Failed += batchFailed + m.state.Indexed += len(files) + m.state.LastFileID = files[len(files)-1].ID + + atomic.StoreInt64(&m.progress[ProgressTypeRebuildIndex].Current, int64(m.state.Indexed)) + + // Suspend and resume for next batch + m.ResumeAfter(0) + return task.StatusSuspending, nil +} + +// processBatch indexes a batch of files concurrently. +func (m *RebuildIndexTask) processBatch(ctx context.Context, dep dependency.Dep, files []*ent.File) int { + user := inventory.UserFromContext(ctx) + + var ( + wg sync.WaitGroup + mu sync.Mutex + failed int + ) + + sem := make(chan struct{}, RebuildIndexConcurrent) + indexer := dep.SearchIndexer(ctx) + extractor := dep.TextExtractor(ctx) + + for _, f := range files { + select { + case <-ctx.Done(): + return failed + case sem <- struct{}{}: + } + + wg.Add(1) + go func(f *ent.File) { + defer func() { + <-sem + wg.Done() + }() + + if err := m.indexSingleFile(ctx, dep, user, indexer, extractor, f); err != nil { + m.l.Warning("Failed to index file %d (%s): %s", f.ID, f.Name, err) + mu.Lock() + failed++ + mu.Unlock() + } + }(f) + } + + wg.Wait() + return failed +} + +// indexSingleFile extracts text from a single file and indexes it. +func (m *RebuildIndexTask) indexSingleFile( + ctx context.Context, + dep dependency.Dep, + user *ent.User, + indexer searcher.SearchIndexer, + extractor searcher.TextExtractor, + f *ent.File, +) error { + fm := manager.NewFileManager(dep, user) + defer fm.Recycle() + + entityID := f.PrimaryEntity + if entityID == 0 { + // No primary entity, index with just the file name (no text content). + m.l.Debug("No primary entity for file %d, skipping.", f.ID) + return nil + } + + // Check if this file type is eligible for text extraction + var text string + if manager.ShouldExtractText(extractor, f.Name, f.Size) { + source, err := fm.GetEntitySource(ctx, entityID) + if err != nil { + // Cannot get source; index with file name only. + m.l.Debug("Cannot get entity source for file %d: %s, skipping.", f.ID, err) + return fmt.Errorf("cannot get entity source for file %d: %w", f.ID, err) + } + defer source.Close() + + if len(m.state.FilteredStoragePolicy) > 0 { + if !slices.Contains(m.state.FilteredStoragePolicy, source.Entity().PolicyID()) { + m.l.Debug("Entity source for file %d is not in filtered storage policy, skipping.", f.ID) + return nil + } + } + + extracted, err := extractor.Extract(ctx, source) + if err != nil { + m.l.Debug("Failed to extract text for file %d: %s, skipping", f.ID, err) + return nil + } else { + text = extracted + } + + if err := indexer.IndexFile(ctx, f.OwnerID, f.ID, entityID, f.Name, text); err != nil { + return fmt.Errorf("failed to index file %d: %w", f.ID, err) + } + + if err := dep.FileClient().UpsertMetadata(ctx, f, map[string]string{ + dbfs.FullTextIndexKey: hashid.EncodeEntityID(dep.HashIDEncoder(), entityID), + }, nil); err != nil { + m.l.Warning("Failed to upsert metadata for file %d: %s", f.ID, err) + } + } + + return nil +} + +func (m *RebuildIndexTask) Progress(ctx context.Context) queue.Progresses { + m.Lock() + defer m.Unlock() + return m.progress +} + +func (m *RebuildIndexTask) Summarize(hasher hashid.Encoder) *queue.Summary { + if m.state == nil { + if err := json.Unmarshal([]byte(m.State()), &m.state); err != nil { + return nil + } + } + + return &queue.Summary{ + Phase: string(m.state.Phase), + Props: map[string]any{ + SummaryKeyFailed: m.state.Failed, + SummaryKeyTotal: m.state.Total, + }, + } +} diff --git a/pkg/filemanager/workflows/remote_download.go b/pkg/filemanager/workflows/remote_download.go index 4425c4d9..e614cbd1 100644 --- a/pkg/filemanager/workflows/remote_download.go +++ b/pkg/filemanager/workflows/remote_download.go @@ -73,6 +73,7 @@ const ( SummaryKeySrcMultiple = "src_multiple" SummaryKeySrcDstPolicyID = "dst_policy_id" SummaryKeyFailed = "failed" + SummaryKeyTotal = "total" ) func init() { diff --git a/pkg/queue/task.go b/pkg/queue/task.go index ee845126..0a2d9fc9 100644 --- a/pkg/queue/task.go +++ b/pkg/queue/task.go @@ -109,6 +109,7 @@ const ( FullTextCopyTaskType = "full_text_copy" FullTextChangeOwnerTaskType = "full_text_change_owner" FullTextDeleteTaskType = "full_text_delete" + FullTextRebuildTaskType = "full_text_rebuild" SlaveCreateArchiveTaskType = "slave_create_archive" SlaveUploadTaskType = "slave_upload" diff --git a/pkg/searcher/extractor/tika.go b/pkg/searcher/extractor/tika.go index ec0b66ff..dbe02fff 100644 --- a/pkg/searcher/extractor/tika.go +++ b/pkg/searcher/extractor/tika.go @@ -18,6 +18,7 @@ type TikaExtractor struct { l logging.Logger exts []string maxFileSize int64 + endpoint string } // NewTikaExtractor creates a new TikaExtractor. @@ -29,6 +30,7 @@ func NewTikaExtractor(client request.Client, settings setting.Provider, l loggin l: l, exts: exts, maxFileSize: cfg.MaxFileSize, + endpoint: cfg.Endpoint, } } @@ -44,12 +46,11 @@ func (t *TikaExtractor) MaxFileSize() int64 { // Extract sends the document to Tika and returns the extracted plain text. func (t *TikaExtractor) Extract(ctx context.Context, reader io.Reader) (string, error) { - tikaCfg := t.settings.FTSTikaExtractor(ctx) - if tikaCfg.Endpoint == "" { + if t.endpoint == "" { return "", fmt.Errorf("tika endpoint not configured") } - endpoint := strings.TrimRight(tikaCfg.Endpoint, "/") + "/tika" + endpoint := strings.TrimRight(t.endpoint, "/") + "/tika" resp := t.client.Request( "PUT", endpoint, @@ -67,13 +68,7 @@ func (t *TikaExtractor) Extract(ctx context.Context, reader io.Reader) (string, return "", fmt.Errorf("tika returned status %d", resp.Response.StatusCode) } - maxSize := tikaCfg.MaxResponseSize - if maxSize <= 0 { - maxSize = 10 * 1024 * 1024 // default 10MB - } - - limited := io.LimitReader(resp.Response.Body, maxSize) - body, err := io.ReadAll(limited) + body, err := io.ReadAll(resp.Response.Body) if err != nil { return "", fmt.Errorf("failed to read tika response: %w", err) } diff --git a/pkg/searcher/indexer.go b/pkg/searcher/indexer.go index 0e7b6518..13d065ff 100644 --- a/pkg/searcher/indexer.go +++ b/pkg/searcher/indexer.go @@ -39,6 +39,8 @@ type SearchIndexer interface { // configuration (filterable/searchable attributes, etc.). IndexReady(ctx context.Context) (bool, error) EnsureIndex(ctx context.Context) error + // DeleteAll removes all documents from the index. + DeleteAll(ctx context.Context) error Close() error } diff --git a/pkg/searcher/indexer/meilisearch.go b/pkg/searcher/indexer/meilisearch.go index 22c34393..ee065c52 100644 --- a/pkg/searcher/indexer/meilisearch.go +++ b/pkg/searcher/indexer/meilisearch.go @@ -14,8 +14,9 @@ import ( ) const ( - indexName = "cloudreve_files" - embedderName = "cr-text" + indexName = "cloudreve_files" + embedderName = "cr-text" + embeddingTemplate = "Chunk #{{doc.chunk_idx}} in a file named '{{doc.file_name}}': {{ doc.text }}" ) // MeilisearchIndexer implements SearchIndexer using Meilisearch. @@ -120,6 +121,7 @@ func (m *MeilisearchIndexer) EnsureIndex(ctx context.Context) error { return nil } + embedder.DocumentTemplate = embeddingTemplate _, err := index.UpdateEmbeddersWithContext(ctx, map[string]meilisearch.Embedder{ embedderName: embedder, }) @@ -379,6 +381,14 @@ func (m *MeilisearchIndexer) Search(ctx context.Context, ownerID int, query stri return results, resp.EstimatedTotalHits, nil } +func (m *MeilisearchIndexer) DeleteAll(ctx context.Context) error { + index := m.client.Index(indexName) + if _, err := index.DeleteAllDocumentsWithContext(ctx, nil); err != nil { + return fmt.Errorf("failed to delete all documents: %w", err) + } + return nil +} + func (m *MeilisearchIndexer) Close() error { return nil } diff --git a/pkg/searcher/indexer/noop.go b/pkg/searcher/indexer/noop.go index 3a4b608b..e2ae396e 100644 --- a/pkg/searcher/indexer/noop.go +++ b/pkg/searcher/indexer/noop.go @@ -41,6 +41,10 @@ func (n *NoopIndexer) EnsureIndex(ctx context.Context) error { return nil } +func (n *NoopIndexer) DeleteAll(ctx context.Context) error { + return nil +} + func (n *NoopIndexer) Close() error { return nil } diff --git a/pkg/setting/provider.go b/pkg/setting/provider.go index 5bc37775..d9d1f43e 100644 --- a/pkg/setting/provider.go +++ b/pkg/setting/provider.go @@ -636,10 +636,9 @@ func (s *settingProvider) FTSIndexMeilisearch(ctx context.Context) *FTSIndexMeil func (s *settingProvider) FTSTikaExtractor(ctx context.Context) *FTSTikaExtractorSetting { return &FTSTikaExtractorSetting{ - Endpoint: s.getString(ctx, "fts_tika_endpoint", ""), - MaxResponseSize: s.getInt64(ctx, "fts_tika_max_response_size", 10485760), - Exts: s.getStringList(ctx, "fts_tika_exts", []string{}), - MaxFileSize: s.getInt64(ctx, "fts_tika_max_file_size_remote", 52428800), + Endpoint: s.getString(ctx, "fts_tika_endpoint", ""), + Exts: s.getStringList(ctx, "fts_tika_exts", []string{}), + MaxFileSize: s.getInt64(ctx, "fts_tika_max_file_size_remote", 52428800), } } diff --git a/pkg/setting/types.go b/pkg/setting/types.go index db75bc79..7c9b975e 100644 --- a/pkg/setting/types.go +++ b/pkg/setting/types.go @@ -248,10 +248,9 @@ type FTSIndexMeilisearchSetting struct { } type FTSTikaExtractorSetting struct { - Endpoint string - MaxResponseSize int64 - Exts []string - MaxFileSize int64 + Endpoint string + Exts []string + MaxFileSize int64 } type MasterEncryptKeyVaultType string diff --git a/routers/controllers/file.go b/routers/controllers/file.go index cc20b102..73039788 100644 --- a/routers/controllers/file.go +++ b/routers/controllers/file.go @@ -68,6 +68,21 @@ func CreateRemoteDownload(c *gin.Context) { } } +// RebuildFTSIndex rebuilds full text search index for files +func RebuildFTSIndex(c *gin.Context) { + service := ParametersFromContext[*explorer.RebuildFTSIndexWorkflowService](c, explorer.CreateRebuildFTSIndexParamCtx{}) + resp, err := service.CreateRebuildFTSIndexTask(c) + if err != nil { + c.JSON(200, serializer.Err(c, err)) + c.Abort() + return + } + + c.JSON(200, serializer.Response{ + Data: resp, + }) +} + // ExtractArchive creates extract archive task func ExtractArchive(c *gin.Context) { service := ParametersFromContext[*explorer.ArchiveWorkflowService](c, explorer.CreateArchiveParamCtx{}) diff --git a/routers/router.go b/routers/router.go index 1164b8ab..8b357d82 100644 --- a/routers/router.go +++ b/routers/router.go @@ -755,6 +755,13 @@ func initMasterRouter(dep dependency.Dep) *gin.Engine { controllers.FromJSON[explorer.ImportWorkflowService](explorer.CreateImportParamCtx{}), controllers.ImportFiles, ) + // Create task to import files + wf.POST("rebuildFtsIndex", + middleware.IsAdmin(), + middleware.RequiredScopes(types.ScopeWorkflowWrite, types.ScopeAdminWrite), + controllers.FromJSON[explorer.RebuildFTSIndexWorkflowService](explorer.CreateRebuildFTSIndexParamCtx{}), + controllers.RebuildFTSIndex, + ) // 取得文件外链 source := file.Group("source") diff --git a/service/admin/site.go b/service/admin/site.go index a316a322..20498fa1 100644 --- a/service/admin/site.go +++ b/service/admin/site.go @@ -263,6 +263,14 @@ var ( "queue_remote_download_max_retry": remoteDownloadQueuePostProcessor, "queue_remote_download_retry_delay": remoteDownloadQueuePostProcessor, "secret_key": secretKeyPostProcessor, + "fts_meilisearch_embed_config": meilisearchPostProcessor, + "fts_meilisearch_endpoint": meilisearchPostProcessor, + "fts_meilisearch_api_key": meilisearchPostProcessor, + "fts_meilisearch_embed_enabled": meilisearchPostProcessor, + "fts_meilisearch_page_size": meilisearchPostProcessor, + "fts_tika_endpoint": tikaPostProcessor, + "fts_tika_exts": tikaPostProcessor, + "fts_tika_max_file_size": tikaPostProcessor, } ) @@ -411,3 +419,15 @@ func secretKeyPostProcessor(ctx context.Context, settings map[string]string) err settings["secret_key"] = "" return nil } + +func meilisearchPostProcessor(ctx context.Context, settings map[string]string) error { + dep := dependency.FromContext(ctx) + dep.SearchIndexer(context.WithValue(ctx, dependency.ReloadCtx{}, true)) + return nil +} + +func tikaPostProcessor(ctx context.Context, settings map[string]string) error { + dep := dependency.FromContext(ctx) + dep.TextExtractor(context.WithValue(ctx, dependency.ReloadCtx{}, true)) + return nil +} diff --git a/service/explorer/workflows.go b/service/explorer/workflows.go index ad7cf098..8bef8c95 100644 --- a/service/explorer/workflows.go +++ b/service/explorer/workflows.go @@ -450,3 +450,34 @@ func (service *SetDownloadFilesService) SetDownloadFiles(c *gin.Context, taskID return nil } + +type ( + RebuildFTSIndexWorkflowService struct { + FilteredStoragePolicy []int `json:"filtered_storage_policy"` + } + CreateRebuildFTSIndexParamCtx struct{} +) + +func (service *RebuildFTSIndexWorkflowService) CreateRebuildFTSIndexTask(c *gin.Context) (*TaskResponse, error) { + dep := dependency.FromContext(c) + user := inventory.UserFromContext(c) + hasher := dep.HashIDEncoder() + m := manager.NewFileManager(dep, user) + defer m.Recycle() + + if !user.Edges.Group.Permissions.Enabled(int(types.GroupPermissionIsAdmin)) { + return nil, serializer.NewError(serializer.CodeGroupNotAllowed, "Only admin can import files", nil) + } + + // Create task + t, err := workflows.NewRebuildIndexTask(c, user, service.FilteredStoragePolicy) + if err != nil { + return nil, serializer.NewError(serializer.CodeCreateTaskError, "Failed to create task", err) + } + + if err := dep.MediaMetaQueue(c).QueueTask(c, t); err != nil { + return nil, serializer.NewError(serializer.CodeCreateTaskError, "Failed to queue task", err) + } + + return BuildTaskResponse(t, nil, hasher), nil +}