feat(perf): improve memory usage for importing task / add configurable Pprof endpoint (fix #3059)

master
Aaron Liu 3 hours ago
parent 736414fa10
commit f01ed64bdb

@ -6,6 +6,7 @@ import (
"fmt"
"net"
"net/http"
_ "net/http/pprof"
"os"
"time"
@ -41,13 +42,14 @@ func NewServer(dep dependency.Dep) Server {
}
type server struct {
dep dependency.Dep
logger logging.Logger
dbClient *ent.Client
config conf.ConfigProvider
server *http.Server
kv cache.Driver
mailQueue email.Driver
dep dependency.Dep
logger logging.Logger
dbClient *ent.Client
config conf.ConfigProvider
server *http.Server
pprofServer *http.Server
kv cache.Driver
mailQueue email.Driver
}
func (s *server) PrintBanner() {
@ -127,6 +129,20 @@ func (s *server) Start() error {
api.TrustedPlatform = s.config.System().ProxyHeader
s.server = &http.Server{Handler: api}
// Start pprof server if configured
if pprofAddr := s.config.System().Pprof; pprofAddr != "" {
s.pprofServer = &http.Server{
Addr: pprofAddr,
Handler: http.DefaultServeMux,
}
go func() {
s.logger.Info("pprof server listening on %q", pprofAddr)
if err := s.pprofServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
s.logger.Error("pprof server error: %s", err)
}
}()
}
// 如果启用了SSL
if s.config.SSL().CertPath != "" {
s.logger.Info("Listening to %q", s.config.SSL().Listen)
@ -188,6 +204,13 @@ func (s *server) Close() {
}
}
// Shutdown pprof server
if s.pprofServer != nil {
if err := s.pprofServer.Shutdown(ctx); err != nil {
s.logger.Error("Failed to shutdown pprof server: %s", err)
}
}
if s.kv != nil {
if err := s.kv.Persist(util.DataPath(cache.DefaultCacheFile)); err != nil {
s.logger.Warning("Failed to persist cache: %s", err)

@ -1 +1 @@
Subproject commit 64da85e59527f01271a30d775d99cb8e18e80aee
Subproject commit 45b90c4fc58be77b3e698d41e90c134a5481cb48

@ -48,6 +48,7 @@ type System struct {
GracePeriod int `validate:"gte=0"`
ProxyHeader string
LogLevel string `validate:"oneof=debug info warning error"`
Pprof string // Address to listen for pprof, e.g. "localhost:6060". Empty to disable.
}
type SSL struct {

@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"runtime"
"sync/atomic"
"github.com/cloudreve/Cloudreve/v4/application/dependency"
@ -43,6 +44,10 @@ type (
const (
ProgressTypeImported = "imported"
ProgressTypeIndexed = "indexed"
// ImportBatchSize is the number of files to process in each batch
// to control memory usage during large imports.
ImportBatchSize = 100
)
func init() {
@ -117,19 +122,19 @@ func (m *ImportTask) Do(ctx context.Context) (task.Status, error) {
func (m *ImportTask) processImport(ctx context.Context, dep dependency.Dep) (task.Status, error) {
user := inventory.UserFromContext(ctx)
fm := manager.NewFileManager(dep, user)
defer fm.Recycle()
failed := 0
dst, err := fs.NewUriFromString(m.state.Dst)
if err != nil {
return task.StatusError, fmt.Errorf("failed to parse dst: %s (%w)", err, queue.CriticalErr)
}
physicalFiles, err := fm.ListPhysical(ctx, m.state.Src, m.state.PolicyID, m.state.Recursive,
// Use a temporary file manager just for listing physical files
listFm := manager.NewFileManager(dep, user)
physicalFiles, err := listFm.ListPhysical(ctx, m.state.Src, m.state.PolicyID, m.state.Recursive,
func(i int) {
atomic.AddInt64(&m.progress[ProgressTypeIndexed].Current, int64(i))
})
listFm.Recycle()
if err != nil {
return task.StatusError, fmt.Errorf("failed to list physical files: %w", err)
}
@ -143,7 +148,41 @@ func (m *ImportTask) processImport(ctx context.Context, dep dependency.Dep) (tas
delete(m.progress, ProgressTypeIndexed)
m.Unlock()
for _, physicalFile := range physicalFiles {
failed := 0
totalFiles := len(physicalFiles)
// Process files in batches to control memory usage
for batchStart := 0; batchStart < totalFiles; batchStart += ImportBatchSize {
batchEnd := min(batchStart+ImportBatchSize, totalFiles)
batch := physicalFiles[batchStart:batchEnd]
batchFailed := m.processBatch(ctx, dep, user, dst, batch)
failed += batchFailed
// Clear batch elements to allow GC of individual items
for i := batchStart; i < batchEnd; i++ {
physicalFiles[i] = fs.PhysicalObject{}
}
// Run GC after each batch to free memory
runtime.GC()
}
// Clear the entire slice to allow GC
physicalFiles = nil
runtime.GC()
m.state.Failed = failed
return task.StatusCompleted, nil
}
// processBatch processes a batch of physical files with a fresh file manager.
func (m *ImportTask) processBatch(ctx context.Context, dep dependency.Dep, user *ent.User, dst *fs.URI, batch []fs.PhysicalObject) int {
fm := manager.NewFileManager(dep, user)
defer fm.Recycle()
failed := 0
for _, physicalFile := range batch {
if physicalFile.IsDir {
m.l.Info("Creating folder %s", physicalFile.RelativePath)
_, err := fm.Create(ctx, dst.Join(physicalFile.RelativePath), types.FileTypeFolder)
@ -168,7 +207,7 @@ func (m *ImportTask) processImport(ctx context.Context, dep dependency.Dep) (tas
}
}
return task.StatusCompleted, nil
return failed
}
func (m *ImportTask) Progress(ctx context.Context) queue.Progresses {

Loading…
Cancel
Save