From f01ed64bdbd5b56e9a8cb5ddd637cd0375cec13e Mon Sep 17 00:00:00 2001 From: Aaron Liu Date: Wed, 14 Jan 2026 11:32:22 +0800 Subject: [PATCH] feat(perf): improve memory usage for importing task / add configurable Pprof endpoint (fix #3059) --- application/application.go | 37 +++++++++++++++++---- assets | 2 +- pkg/conf/types.go | 1 + pkg/filemanager/workflows/import.go | 51 +++++++++++++++++++++++++---- 4 files changed, 77 insertions(+), 14 deletions(-) diff --git a/application/application.go b/application/application.go index 591faa28..0380872a 100644 --- a/application/application.go +++ b/application/application.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "net/http" + _ "net/http/pprof" "os" "time" @@ -41,13 +42,14 @@ func NewServer(dep dependency.Dep) Server { } type server struct { - dep dependency.Dep - logger logging.Logger - dbClient *ent.Client - config conf.ConfigProvider - server *http.Server - kv cache.Driver - mailQueue email.Driver + dep dependency.Dep + logger logging.Logger + dbClient *ent.Client + config conf.ConfigProvider + server *http.Server + pprofServer *http.Server + kv cache.Driver + mailQueue email.Driver } func (s *server) PrintBanner() { @@ -127,6 +129,20 @@ func (s *server) Start() error { api.TrustedPlatform = s.config.System().ProxyHeader s.server = &http.Server{Handler: api} + // Start pprof server if configured + if pprofAddr := s.config.System().Pprof; pprofAddr != "" { + s.pprofServer = &http.Server{ + Addr: pprofAddr, + Handler: http.DefaultServeMux, + } + go func() { + s.logger.Info("pprof server listening on %q", pprofAddr) + if err := s.pprofServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + s.logger.Error("pprof server error: %s", err) + } + }() + } + // 如果启用了SSL if s.config.SSL().CertPath != "" { s.logger.Info("Listening to %q", s.config.SSL().Listen) @@ -188,6 +204,13 @@ func (s *server) Close() { } } + // Shutdown pprof server + if s.pprofServer != nil { + if err := s.pprofServer.Shutdown(ctx); err != nil { + s.logger.Error("Failed to shutdown pprof server: %s", err) + } + } + if s.kv != nil { if err := s.kv.Persist(util.DataPath(cache.DefaultCacheFile)); err != nil { s.logger.Warning("Failed to persist cache: %s", err) diff --git a/assets b/assets index 64da85e5..45b90c4f 160000 --- a/assets +++ b/assets @@ -1 +1 @@ -Subproject commit 64da85e59527f01271a30d775d99cb8e18e80aee +Subproject commit 45b90c4fc58be77b3e698d41e90c134a5481cb48 diff --git a/pkg/conf/types.go b/pkg/conf/types.go index bdf6f80b..5fdda23c 100644 --- a/pkg/conf/types.go +++ b/pkg/conf/types.go @@ -48,6 +48,7 @@ type System struct { GracePeriod int `validate:"gte=0"` ProxyHeader string LogLevel string `validate:"oneof=debug info warning error"` + Pprof string // Address to listen for pprof, e.g. "localhost:6060". Empty to disable. } type SSL struct { diff --git a/pkg/filemanager/workflows/import.go b/pkg/filemanager/workflows/import.go index 84b0ed3c..6a7a8817 100644 --- a/pkg/filemanager/workflows/import.go +++ b/pkg/filemanager/workflows/import.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "runtime" "sync/atomic" "github.com/cloudreve/Cloudreve/v4/application/dependency" @@ -43,6 +44,10 @@ type ( const ( ProgressTypeImported = "imported" ProgressTypeIndexed = "indexed" + + // ImportBatchSize is the number of files to process in each batch + // to control memory usage during large imports. + ImportBatchSize = 100 ) func init() { @@ -117,19 +122,19 @@ func (m *ImportTask) Do(ctx context.Context) (task.Status, error) { func (m *ImportTask) processImport(ctx context.Context, dep dependency.Dep) (task.Status, error) { user := inventory.UserFromContext(ctx) - fm := manager.NewFileManager(dep, user) - defer fm.Recycle() - failed := 0 dst, err := fs.NewUriFromString(m.state.Dst) if err != nil { return task.StatusError, fmt.Errorf("failed to parse dst: %s (%w)", err, queue.CriticalErr) } - physicalFiles, err := fm.ListPhysical(ctx, m.state.Src, m.state.PolicyID, m.state.Recursive, + // Use a temporary file manager just for listing physical files + listFm := manager.NewFileManager(dep, user) + physicalFiles, err := listFm.ListPhysical(ctx, m.state.Src, m.state.PolicyID, m.state.Recursive, func(i int) { atomic.AddInt64(&m.progress[ProgressTypeIndexed].Current, int64(i)) }) + listFm.Recycle() if err != nil { return task.StatusError, fmt.Errorf("failed to list physical files: %w", err) } @@ -143,7 +148,41 @@ func (m *ImportTask) processImport(ctx context.Context, dep dependency.Dep) (tas delete(m.progress, ProgressTypeIndexed) m.Unlock() - for _, physicalFile := range physicalFiles { + failed := 0 + totalFiles := len(physicalFiles) + + // Process files in batches to control memory usage + for batchStart := 0; batchStart < totalFiles; batchStart += ImportBatchSize { + batchEnd := min(batchStart+ImportBatchSize, totalFiles) + + batch := physicalFiles[batchStart:batchEnd] + batchFailed := m.processBatch(ctx, dep, user, dst, batch) + failed += batchFailed + + // Clear batch elements to allow GC of individual items + for i := batchStart; i < batchEnd; i++ { + physicalFiles[i] = fs.PhysicalObject{} + } + + // Run GC after each batch to free memory + runtime.GC() + } + + // Clear the entire slice to allow GC + physicalFiles = nil + runtime.GC() + + m.state.Failed = failed + return task.StatusCompleted, nil +} + +// processBatch processes a batch of physical files with a fresh file manager. +func (m *ImportTask) processBatch(ctx context.Context, dep dependency.Dep, user *ent.User, dst *fs.URI, batch []fs.PhysicalObject) int { + fm := manager.NewFileManager(dep, user) + defer fm.Recycle() + + failed := 0 + for _, physicalFile := range batch { if physicalFile.IsDir { m.l.Info("Creating folder %s", physicalFile.RelativePath) _, err := fm.Create(ctx, dst.Join(physicalFile.RelativePath), types.FileTypeFolder) @@ -168,7 +207,7 @@ func (m *ImportTask) processImport(ctx context.Context, dep dependency.Dep) (tas } } - return task.StatusCompleted, nil + return failed } func (m *ImportTask) Progress(ctx context.Context) queue.Progresses {