feat(fts): start background task to force build index for existing files (close #2895)

pull/3315/head
Aaron Liu 2 weeks ago
parent 1e3b851e19
commit 153a00ecd5

@ -648,6 +648,7 @@ func (d *dependency) MediaMetaQueue(ctx context.Context) queue.Queue {
queue.MediaMetaTaskType,
queue.FullTextIndexTaskType,
queue.FullTextDeleteTaskType,
queue.FullTextRebuildTaskType,
queue.FullTextCopyTaskType,
queue.FullTextChangeOwnerTaskType,
),

@ -1 +1 @@
Subproject commit 4a38a946cb40c3fe9878af0945c33a8b6987637b
Subproject commit e3e7fd331c869ea67393a5ac5d96ee9fbd8e30ab

@ -209,6 +209,11 @@ type FileClient interface {
UnlinkEntity(ctx context.Context, entity *ent.Entity, file *ent.File, owner *ent.User) (StorageDiff, error)
// CreateDirectLink creates a direct link for a file
CreateDirectLink(ctx context.Context, fileID int, name string, speed int, reuse bool) (*ent.DirectLink, error)
// CountIndexableFiles counts files suitable for FTS indexing (non-empty name, has parent, is file type).
CountIndexableFiles(ctx context.Context) (int, error)
// ListIndexableFiles lists files suitable for FTS indexing, returning up to limit files
// with ID strictly greater than afterID. Use afterID=0 to start from the beginning.
ListIndexableFiles(ctx context.Context, afterID, limit int) ([]*ent.File, error)
// CountByTimeRange counts files created in a given time range
CountByTimeRange(ctx context.Context, start, end *time.Time) (int, error)
// CountEntityByTimeRange counts entities created in a given time range
@ -229,6 +234,8 @@ type FileClient interface {
UpdateProps(ctx context.Context, file *ent.File, props *types.FileProps) (*ent.File, error)
// UpdateModifiedAt updates modified at of a file
UpdateModifiedAt(ctx context.Context, file *ent.File, modifiedAt time.Time) error
// DeleteAllMetadataByName deletes all metadata by a given name
DeleteAllMetadataByName(ctx context.Context, name string) error
}
func NewFileClient(client *ent.Client, dbType conf.DBType, hasher hashid.Encoder) FileClient {
@ -314,6 +321,36 @@ func (f *fileClient) CountByTimeRange(ctx context.Context, start, end *time.Time
return f.client.File.Query().Where(file.CreatedAtGTE(*start), file.CreatedAtLT(*end)).Count(ctx)
}
func (f *fileClient) DeleteAllMetadataByName(ctx context.Context, name string) error {
_, err := f.client.Metadata.Delete().Where(metadata.Name(name)).Exec(schema.SkipSoftDelete(ctx))
if err != nil {
return fmt.Errorf("failed to delete metadata: %w", err)
}
return nil
}
func (f *fileClient) CountIndexableFiles(ctx context.Context) (int, error) {
return f.indexableFilesQuery().Count(ctx)
}
func (f *fileClient) ListIndexableFiles(ctx context.Context, afterID, limit int) ([]*ent.File, error) {
q := f.indexableFilesQuery()
if afterID > 0 {
q = q.Where(file.IDGT(afterID))
}
return q.Limit(limit).All(ctx)
}
func (f *fileClient) indexableFilesQuery() *ent.FileQuery {
return f.client.File.Query().Where(
file.Type(int(types.FileTypeFile)),
file.NameNEQ(""),
file.SizeGT(0),
file.FileChildrenNotNil(),
).Order(file.ByID())
}
func (f *fileClient) CountEntityByTimeRange(ctx context.Context, start, end *time.Time) (int, error) {
if start == nil || end == nil {
return f.client.Entity.Query().Count(ctx)

@ -673,15 +673,14 @@ var DefaultSettings = map[string]string{
"fs_event_push_max_age": "1209600",
"fs_event_push_debounce": "5",
"fts_enabled": "0",
"fts_index_type": "",
"fts_extractor_type": "",
"fts_index_type": "meilisearch",
"fts_extractor_type": "tika",
"fts_meilisearch_endpoint": "",
"fts_meilisearch_api_key": "",
"fts_meilisearch_page_size": "5",
"fts_meilisearch_embed_enabled": "0",
"fts_meilisearch_embed_config": "{}",
"fts_tika_endpoint": "",
"fts_tika_max_response_size": "10485760",
"fts_tika_exts": "pdf,doc,docx,xls,xlsx,ppt,pptx,odt,ods,odp,rtf,txt,md,html,htm,epub,csv",
"fts_tika_max_file_size": "26214400",
"fts_chunk_size": "2000",

@ -389,7 +389,7 @@ func (t *FullTextIndexTask) Do(ctx context.Context) (task.Status, error) {
// performIndexing extracts text from the entity and indexes it. This is shared between
// the regular index task and the copy task (as a fallback when copy fails).
func performIndexing(ctx context.Context, fm *manager, uri *fs.URI, entityID, fileID, ownerID int, fileName string, deleteOldChunks bool) (task.Status, error) {
dep := fm.dep
dep := dependency.FromContext(ctx)
l := dep.Logger()
// Get entity source
@ -438,6 +438,13 @@ func performIndexing(ctx context.Context, fm *manager, uri *fs.URI, entityID, fi
return task.StatusCompleted, nil
}
// ShouldExtractText checks if a file is eligible for text extraction based on
// the extractor's supported extensions and max file size. This is exported for
// use by the rebuild index workflow.
func ShouldExtractText(extractor searcher.TextExtractor, fileName string, size int64) bool {
return util.IsInExtensionList(extractor.Exts(), fileName) && extractor.MaxFileSize() > size
}
// shouldIndexFullText checks if a file should be indexed for full-text search.
func (m *manager) shouldIndexFullText(ctx context.Context, fileName string, size int64) bool {
if !m.settings.FTSEnabled(ctx) {
@ -445,7 +452,7 @@ func (m *manager) shouldIndexFullText(ctx context.Context, fileName string, size
}
extractor := m.dep.TextExtractor(ctx)
return util.IsInExtensionList(extractor.Exts(), fileName) && extractor.MaxFileSize() > size
return ShouldExtractText(extractor, fileName, size)
}
// fullTextIndexForNewEntity creates and queues a full text index task for a newly uploaded entity.

@ -0,0 +1,311 @@
package workflows
import (
"context"
"encoding/json"
"fmt"
"slices"
"sync"
"sync/atomic"
"github.com/cloudreve/Cloudreve/v4/application/dependency"
"github.com/cloudreve/Cloudreve/v4/ent"
"github.com/cloudreve/Cloudreve/v4/ent/task"
"github.com/cloudreve/Cloudreve/v4/inventory"
"github.com/cloudreve/Cloudreve/v4/inventory/types"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs/dbfs"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/manager"
"github.com/cloudreve/Cloudreve/v4/pkg/hashid"
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
"github.com/cloudreve/Cloudreve/v4/pkg/searcher"
)
type (
RebuildIndexTask struct {
*queue.DBTask
l logging.Logger
state *RebuildIndexTaskState
progress queue.Progresses
}
RebuildIndexTaskPhase string
RebuildIndexTaskState struct {
Phase RebuildIndexTaskPhase `json:"phase"`
Total int `json:"total"`
Indexed int `json:"indexed"`
LastFileID int `json:"last_file_id"`
Failed int `json:"failed"`
FilteredStoragePolicy []int `json:"filtered_storage_policy"`
}
)
const (
RebuildIndexPhaseNuke RebuildIndexTaskPhase = "nuke"
RebuildIndexPhaseIndex RebuildIndexTaskPhase = "index"
RebuildIndexBatchSize = 1000
RebuildIndexConcurrent = 4
ProgressTypeRebuildIndex = "rebuild_index"
)
func init() {
queue.RegisterResumableTaskFactory(queue.FullTextRebuildTaskType, NewRebuildIndexTaskFromModel)
}
func NewRebuildIndexTask(ctx context.Context, u *ent.User, filteredStoragePolicy []int) (queue.Task, error) {
state := &RebuildIndexTaskState{
Phase: RebuildIndexPhaseNuke,
FilteredStoragePolicy: filteredStoragePolicy,
}
stateBytes, err := json.Marshal(state)
if err != nil {
return nil, fmt.Errorf("failed to marshal state: %w", err)
}
return &RebuildIndexTask{
DBTask: &queue.DBTask{
Task: &ent.Task{
Type: queue.FullTextRebuildTaskType,
CorrelationID: logging.CorrelationID(ctx),
PrivateState: string(stateBytes),
PublicState: &types.TaskPublicState{},
},
DirectOwner: u,
},
}, nil
}
func NewRebuildIndexTaskFromModel(t *ent.Task) queue.Task {
return &RebuildIndexTask{
DBTask: &queue.DBTask{
Task: t,
},
}
}
func (m *RebuildIndexTask) Do(ctx context.Context) (task.Status, error) {
dep := dependency.FromContext(ctx)
m.l = dep.Logger()
m.Lock()
if m.progress == nil {
m.progress = make(queue.Progresses)
}
m.progress[ProgressTypeRebuildIndex] = &queue.Progress{}
m.Unlock()
state := &RebuildIndexTaskState{}
if err := json.Unmarshal([]byte(m.State()), state); err != nil {
return task.StatusError, fmt.Errorf("failed to unmarshal state: %s (%w)", err, queue.CriticalErr)
}
m.state = state
var (
next = task.StatusCompleted
err error
)
switch m.state.Phase {
case RebuildIndexPhaseNuke, "":
next, err = m.nuke(ctx, dep)
case RebuildIndexPhaseIndex:
next, err = m.index(ctx, dep)
default:
next, err = task.StatusError, fmt.Errorf("unknown phase %q: %w", m.state.Phase, queue.CriticalErr)
}
newStateStr, marshalErr := json.Marshal(m.state)
if marshalErr != nil {
return task.StatusError, fmt.Errorf("failed to marshal state: %w", marshalErr)
}
m.Lock()
m.Task.PrivateState = string(newStateStr)
m.Unlock()
return next, err
}
// nuke deletes all existing index documents and ensures a fresh index exists,
// then counts total indexable files for progress tracking.
func (m *RebuildIndexTask) nuke(ctx context.Context, dep dependency.Dep) (task.Status, error) {
indexer := dep.SearchIndexer(ctx)
m.l.Info("Deleting all existing index documents...")
if err := indexer.DeleteAll(ctx); err != nil {
return task.StatusError, fmt.Errorf("failed to delete all index documents: %w", err)
}
if err := dep.FileClient().DeleteAllMetadataByName(ctx, dbfs.FullTextIndexKey); err != nil {
return task.StatusError, fmt.Errorf("failed to delete all metadata by name: %w", err)
}
m.l.Info("Ensuring index exists with correct configuration...")
if err := indexer.EnsureIndex(ctx); err != nil {
return task.StatusError, fmt.Errorf("failed to ensure index: %w", err)
}
// Count total indexable files
total, err := dep.FileClient().CountIndexableFiles(ctx)
if err != nil {
return task.StatusError, fmt.Errorf("failed to count indexable files: %w", err)
}
m.state.Total = total
m.state.Phase = RebuildIndexPhaseIndex
m.state.LastFileID = 0
m.state.Indexed = 0
m.l.Info("Found %d indexable files, starting rebuild...", total)
m.ResumeAfter(0)
return task.StatusSuspending, nil
}
// index processes a batch of files and suspends for the next batch.
func (m *RebuildIndexTask) index(ctx context.Context, dep dependency.Dep) (task.Status, error) {
atomic.StoreInt64(&m.progress[ProgressTypeRebuildIndex].Total, int64(m.state.Total))
atomic.StoreInt64(&m.progress[ProgressTypeRebuildIndex].Current, int64(m.state.Indexed))
files, err := dep.FileClient().ListIndexableFiles(ctx, m.state.LastFileID, RebuildIndexBatchSize)
if err != nil {
return task.StatusError, fmt.Errorf("failed to list indexable files after ID %d: %w", m.state.LastFileID, err)
}
if len(files) == 0 {
m.l.Info("Rebuild complete. %d files indexed, %d failed.", m.state.Indexed-m.state.Failed, m.state.Failed)
return task.StatusCompleted, nil
}
batchFailed := m.processBatch(ctx, dep, files)
m.state.Failed += batchFailed
m.state.Indexed += len(files)
m.state.LastFileID = files[len(files)-1].ID
atomic.StoreInt64(&m.progress[ProgressTypeRebuildIndex].Current, int64(m.state.Indexed))
// Suspend and resume for next batch
m.ResumeAfter(0)
return task.StatusSuspending, nil
}
// processBatch indexes a batch of files concurrently.
func (m *RebuildIndexTask) processBatch(ctx context.Context, dep dependency.Dep, files []*ent.File) int {
user := inventory.UserFromContext(ctx)
var (
wg sync.WaitGroup
mu sync.Mutex
failed int
)
sem := make(chan struct{}, RebuildIndexConcurrent)
indexer := dep.SearchIndexer(ctx)
extractor := dep.TextExtractor(ctx)
for _, f := range files {
select {
case <-ctx.Done():
return failed
case sem <- struct{}{}:
}
wg.Add(1)
go func(f *ent.File) {
defer func() {
<-sem
wg.Done()
}()
if err := m.indexSingleFile(ctx, dep, user, indexer, extractor, f); err != nil {
m.l.Warning("Failed to index file %d (%s): %s", f.ID, f.Name, err)
mu.Lock()
failed++
mu.Unlock()
}
}(f)
}
wg.Wait()
return failed
}
// indexSingleFile extracts text from a single file and indexes it.
func (m *RebuildIndexTask) indexSingleFile(
ctx context.Context,
dep dependency.Dep,
user *ent.User,
indexer searcher.SearchIndexer,
extractor searcher.TextExtractor,
f *ent.File,
) error {
fm := manager.NewFileManager(dep, user)
defer fm.Recycle()
entityID := f.PrimaryEntity
if entityID == 0 {
// No primary entity, index with just the file name (no text content).
m.l.Debug("No primary entity for file %d, skipping.", f.ID)
return nil
}
// Check if this file type is eligible for text extraction
var text string
if manager.ShouldExtractText(extractor, f.Name, f.Size) {
source, err := fm.GetEntitySource(ctx, entityID)
if err != nil {
// Cannot get source; index with file name only.
m.l.Debug("Cannot get entity source for file %d: %s, skipping.", f.ID, err)
return fmt.Errorf("cannot get entity source for file %d: %w", f.ID, err)
}
defer source.Close()
if len(m.state.FilteredStoragePolicy) > 0 {
if !slices.Contains(m.state.FilteredStoragePolicy, source.Entity().PolicyID()) {
m.l.Debug("Entity source for file %d is not in filtered storage policy, skipping.", f.ID)
return nil
}
}
extracted, err := extractor.Extract(ctx, source)
if err != nil {
m.l.Debug("Failed to extract text for file %d: %s, skipping", f.ID, err)
return nil
} else {
text = extracted
}
if err := indexer.IndexFile(ctx, f.OwnerID, f.ID, entityID, f.Name, text); err != nil {
return fmt.Errorf("failed to index file %d: %w", f.ID, err)
}
if err := dep.FileClient().UpsertMetadata(ctx, f, map[string]string{
dbfs.FullTextIndexKey: hashid.EncodeEntityID(dep.HashIDEncoder(), entityID),
}, nil); err != nil {
m.l.Warning("Failed to upsert metadata for file %d: %s", f.ID, err)
}
}
return nil
}
func (m *RebuildIndexTask) Progress(ctx context.Context) queue.Progresses {
m.Lock()
defer m.Unlock()
return m.progress
}
func (m *RebuildIndexTask) Summarize(hasher hashid.Encoder) *queue.Summary {
if m.state == nil {
if err := json.Unmarshal([]byte(m.State()), &m.state); err != nil {
return nil
}
}
return &queue.Summary{
Phase: string(m.state.Phase),
Props: map[string]any{
SummaryKeyFailed: m.state.Failed,
SummaryKeyTotal: m.state.Total,
},
}
}

@ -73,6 +73,7 @@ const (
SummaryKeySrcMultiple = "src_multiple"
SummaryKeySrcDstPolicyID = "dst_policy_id"
SummaryKeyFailed = "failed"
SummaryKeyTotal = "total"
)
func init() {

@ -109,6 +109,7 @@ const (
FullTextCopyTaskType = "full_text_copy"
FullTextChangeOwnerTaskType = "full_text_change_owner"
FullTextDeleteTaskType = "full_text_delete"
FullTextRebuildTaskType = "full_text_rebuild"
SlaveCreateArchiveTaskType = "slave_create_archive"
SlaveUploadTaskType = "slave_upload"

@ -18,6 +18,7 @@ type TikaExtractor struct {
l logging.Logger
exts []string
maxFileSize int64
endpoint string
}
// NewTikaExtractor creates a new TikaExtractor.
@ -29,6 +30,7 @@ func NewTikaExtractor(client request.Client, settings setting.Provider, l loggin
l: l,
exts: exts,
maxFileSize: cfg.MaxFileSize,
endpoint: cfg.Endpoint,
}
}
@ -44,12 +46,11 @@ func (t *TikaExtractor) MaxFileSize() int64 {
// Extract sends the document to Tika and returns the extracted plain text.
func (t *TikaExtractor) Extract(ctx context.Context, reader io.Reader) (string, error) {
tikaCfg := t.settings.FTSTikaExtractor(ctx)
if tikaCfg.Endpoint == "" {
if t.endpoint == "" {
return "", fmt.Errorf("tika endpoint not configured")
}
endpoint := strings.TrimRight(tikaCfg.Endpoint, "/") + "/tika"
endpoint := strings.TrimRight(t.endpoint, "/") + "/tika"
resp := t.client.Request(
"PUT",
endpoint,
@ -67,13 +68,7 @@ func (t *TikaExtractor) Extract(ctx context.Context, reader io.Reader) (string,
return "", fmt.Errorf("tika returned status %d", resp.Response.StatusCode)
}
maxSize := tikaCfg.MaxResponseSize
if maxSize <= 0 {
maxSize = 10 * 1024 * 1024 // default 10MB
}
limited := io.LimitReader(resp.Response.Body, maxSize)
body, err := io.ReadAll(limited)
body, err := io.ReadAll(resp.Response.Body)
if err != nil {
return "", fmt.Errorf("failed to read tika response: %w", err)
}

@ -39,6 +39,8 @@ type SearchIndexer interface {
// configuration (filterable/searchable attributes, etc.).
IndexReady(ctx context.Context) (bool, error)
EnsureIndex(ctx context.Context) error
// DeleteAll removes all documents from the index.
DeleteAll(ctx context.Context) error
Close() error
}

@ -14,8 +14,9 @@ import (
)
const (
indexName = "cloudreve_files"
embedderName = "cr-text"
indexName = "cloudreve_files"
embedderName = "cr-text"
embeddingTemplate = "Chunk #{{doc.chunk_idx}} in a file named '{{doc.file_name}}': {{ doc.text }}"
)
// MeilisearchIndexer implements SearchIndexer using Meilisearch.
@ -120,6 +121,7 @@ func (m *MeilisearchIndexer) EnsureIndex(ctx context.Context) error {
return nil
}
embedder.DocumentTemplate = embeddingTemplate
_, err := index.UpdateEmbeddersWithContext(ctx, map[string]meilisearch.Embedder{
embedderName: embedder,
})
@ -379,6 +381,14 @@ func (m *MeilisearchIndexer) Search(ctx context.Context, ownerID int, query stri
return results, resp.EstimatedTotalHits, nil
}
func (m *MeilisearchIndexer) DeleteAll(ctx context.Context) error {
index := m.client.Index(indexName)
if _, err := index.DeleteAllDocumentsWithContext(ctx, nil); err != nil {
return fmt.Errorf("failed to delete all documents: %w", err)
}
return nil
}
func (m *MeilisearchIndexer) Close() error {
return nil
}

@ -41,6 +41,10 @@ func (n *NoopIndexer) EnsureIndex(ctx context.Context) error {
return nil
}
func (n *NoopIndexer) DeleteAll(ctx context.Context) error {
return nil
}
func (n *NoopIndexer) Close() error {
return nil
}

@ -636,10 +636,9 @@ func (s *settingProvider) FTSIndexMeilisearch(ctx context.Context) *FTSIndexMeil
func (s *settingProvider) FTSTikaExtractor(ctx context.Context) *FTSTikaExtractorSetting {
return &FTSTikaExtractorSetting{
Endpoint: s.getString(ctx, "fts_tika_endpoint", ""),
MaxResponseSize: s.getInt64(ctx, "fts_tika_max_response_size", 10485760),
Exts: s.getStringList(ctx, "fts_tika_exts", []string{}),
MaxFileSize: s.getInt64(ctx, "fts_tika_max_file_size_remote", 52428800),
Endpoint: s.getString(ctx, "fts_tika_endpoint", ""),
Exts: s.getStringList(ctx, "fts_tika_exts", []string{}),
MaxFileSize: s.getInt64(ctx, "fts_tika_max_file_size_remote", 52428800),
}
}

@ -248,10 +248,9 @@ type FTSIndexMeilisearchSetting struct {
}
type FTSTikaExtractorSetting struct {
Endpoint string
MaxResponseSize int64
Exts []string
MaxFileSize int64
Endpoint string
Exts []string
MaxFileSize int64
}
type MasterEncryptKeyVaultType string

@ -68,6 +68,21 @@ func CreateRemoteDownload(c *gin.Context) {
}
}
// RebuildFTSIndex rebuilds full text search index for files
func RebuildFTSIndex(c *gin.Context) {
service := ParametersFromContext[*explorer.RebuildFTSIndexWorkflowService](c, explorer.CreateRebuildFTSIndexParamCtx{})
resp, err := service.CreateRebuildFTSIndexTask(c)
if err != nil {
c.JSON(200, serializer.Err(c, err))
c.Abort()
return
}
c.JSON(200, serializer.Response{
Data: resp,
})
}
// ExtractArchive creates extract archive task
func ExtractArchive(c *gin.Context) {
service := ParametersFromContext[*explorer.ArchiveWorkflowService](c, explorer.CreateArchiveParamCtx{})

@ -755,6 +755,13 @@ func initMasterRouter(dep dependency.Dep) *gin.Engine {
controllers.FromJSON[explorer.ImportWorkflowService](explorer.CreateImportParamCtx{}),
controllers.ImportFiles,
)
// Create task to import files
wf.POST("rebuildFtsIndex",
middleware.IsAdmin(),
middleware.RequiredScopes(types.ScopeWorkflowWrite, types.ScopeAdminWrite),
controllers.FromJSON[explorer.RebuildFTSIndexWorkflowService](explorer.CreateRebuildFTSIndexParamCtx{}),
controllers.RebuildFTSIndex,
)
// 取得文件外链
source := file.Group("source")

@ -263,6 +263,14 @@ var (
"queue_remote_download_max_retry": remoteDownloadQueuePostProcessor,
"queue_remote_download_retry_delay": remoteDownloadQueuePostProcessor,
"secret_key": secretKeyPostProcessor,
"fts_meilisearch_embed_config": meilisearchPostProcessor,
"fts_meilisearch_endpoint": meilisearchPostProcessor,
"fts_meilisearch_api_key": meilisearchPostProcessor,
"fts_meilisearch_embed_enabled": meilisearchPostProcessor,
"fts_meilisearch_page_size": meilisearchPostProcessor,
"fts_tika_endpoint": tikaPostProcessor,
"fts_tika_exts": tikaPostProcessor,
"fts_tika_max_file_size": tikaPostProcessor,
}
)
@ -411,3 +419,15 @@ func secretKeyPostProcessor(ctx context.Context, settings map[string]string) err
settings["secret_key"] = ""
return nil
}
func meilisearchPostProcessor(ctx context.Context, settings map[string]string) error {
dep := dependency.FromContext(ctx)
dep.SearchIndexer(context.WithValue(ctx, dependency.ReloadCtx{}, true))
return nil
}
func tikaPostProcessor(ctx context.Context, settings map[string]string) error {
dep := dependency.FromContext(ctx)
dep.TextExtractor(context.WithValue(ctx, dependency.ReloadCtx{}, true))
return nil
}

@ -450,3 +450,34 @@ func (service *SetDownloadFilesService) SetDownloadFiles(c *gin.Context, taskID
return nil
}
type (
RebuildFTSIndexWorkflowService struct {
FilteredStoragePolicy []int `json:"filtered_storage_policy"`
}
CreateRebuildFTSIndexParamCtx struct{}
)
func (service *RebuildFTSIndexWorkflowService) CreateRebuildFTSIndexTask(c *gin.Context) (*TaskResponse, error) {
dep := dependency.FromContext(c)
user := inventory.UserFromContext(c)
hasher := dep.HashIDEncoder()
m := manager.NewFileManager(dep, user)
defer m.Recycle()
if !user.Edges.Group.Permissions.Enabled(int(types.GroupPermissionIsAdmin)) {
return nil, serializer.NewError(serializer.CodeGroupNotAllowed, "Only admin can import files", nil)
}
// Create task
t, err := workflows.NewRebuildIndexTask(c, user, service.FilteredStoragePolicy)
if err != nil {
return nil, serializer.NewError(serializer.CodeCreateTaskError, "Failed to create task", err)
}
if err := dep.MediaMetaQueue(c).QueueTask(c, t); err != nil {
return nil, serializer.NewError(serializer.CodeCreateTaskError, "Failed to queue task", err)
}
return BuildTaskResponse(t, nil, hasher), nil
}

Loading…
Cancel
Save