You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
688 lines
20 KiB
688 lines
20 KiB
package workflows
|
|
|
|
import (
|
|
"archive/zip"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/samber/lo"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/cloudreve/Cloudreve/v4/application/dependency"
|
|
"github.com/cloudreve/Cloudreve/v4/ent"
|
|
"github.com/cloudreve/Cloudreve/v4/ent/task"
|
|
"github.com/cloudreve/Cloudreve/v4/inventory"
|
|
"github.com/cloudreve/Cloudreve/v4/inventory/types"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/cluster"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/manager"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/manager/entitysource"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/hashid"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
|
|
"github.com/cloudreve/Cloudreve/v4/pkg/util"
|
|
"github.com/gofrs/uuid"
|
|
)
|
|
|
|
type (
|
|
CreateArchiveTask struct {
|
|
*queue.DBTask
|
|
|
|
l logging.Logger
|
|
state *CreateArchiveTaskState
|
|
progress queue.Progresses
|
|
node cluster.Node
|
|
}
|
|
|
|
CreateArchiveTaskPhase string
|
|
|
|
CreateArchiveTaskState struct {
|
|
Uris []string `json:"uris,omitempty"`
|
|
Dst string `json:"dst,omitempty"`
|
|
TempPath string `json:"temp_path,omitempty"`
|
|
ArchiveFile string `json:"archive_file,omitempty"`
|
|
Phase CreateArchiveTaskPhase `json:"phase,omitempty"`
|
|
SlaveUploadTaskID int `json:"slave__upload_task_id,omitempty"`
|
|
SlaveArchiveTaskID int `json:"slave__archive_task_id,omitempty"`
|
|
SlaveCompressState *SlaveCreateArchiveTaskState `json:"slave_compress_state,omitempty"`
|
|
Failed int `json:"failed,omitempty"`
|
|
NodeState `json:",inline"`
|
|
}
|
|
)
|
|
|
|
const (
|
|
CreateArchiveTaskPhaseNotStarted CreateArchiveTaskPhase = "not_started"
|
|
CreateArchiveTaskPhaseCompressFiles CreateArchiveTaskPhase = "compress_files"
|
|
CreateArchiveTaskPhaseUploadArchive CreateArchiveTaskPhase = "upload_archive"
|
|
|
|
CreateArchiveTaskPhaseAwaitSlaveCompressing CreateArchiveTaskPhase = "await_slave_compressing"
|
|
CreateArchiveTaskPhaseCreateAndAwaitSlaveUploading CreateArchiveTaskPhase = "await_slave_uploading"
|
|
CreateArchiveTaskPhaseCompleteUpload CreateArchiveTaskPhase = "complete_upload"
|
|
|
|
ProgressTypeArchiveCount = "archive_count"
|
|
ProgressTypeArchiveSize = "archive_size"
|
|
ProgressTypeUpload = "upload"
|
|
ProgressTypeUploadCount = "upload_count"
|
|
)
|
|
|
|
func init() {
|
|
queue.RegisterResumableTaskFactory(queue.CreateArchiveTaskType, NewCreateArchiveTaskFromModel)
|
|
}
|
|
|
|
// NewCreateArchiveTask creates a new CreateArchiveTask
|
|
func NewCreateArchiveTask(ctx context.Context, src []string, dst string) (queue.Task, error) {
|
|
state := &CreateArchiveTaskState{
|
|
Uris: src,
|
|
Dst: dst,
|
|
NodeState: NodeState{},
|
|
}
|
|
stateBytes, err := json.Marshal(state)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal state: %w", err)
|
|
}
|
|
|
|
t := &CreateArchiveTask{
|
|
DBTask: &queue.DBTask{
|
|
Task: &ent.Task{
|
|
Type: queue.CreateArchiveTaskType,
|
|
CorrelationID: logging.CorrelationID(ctx),
|
|
PrivateState: string(stateBytes),
|
|
PublicState: &types.TaskPublicState{},
|
|
},
|
|
DirectOwner: inventory.UserFromContext(ctx),
|
|
},
|
|
}
|
|
return t, nil
|
|
}
|
|
|
|
func NewCreateArchiveTaskFromModel(task *ent.Task) queue.Task {
|
|
return &CreateArchiveTask{
|
|
DBTask: &queue.DBTask{
|
|
Task: task,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (m *CreateArchiveTask) Do(ctx context.Context) (task.Status, error) {
|
|
dep := dependency.FromContext(ctx)
|
|
m.l = dep.Logger()
|
|
|
|
m.Lock()
|
|
if m.progress == nil {
|
|
m.progress = make(queue.Progresses)
|
|
}
|
|
m.Unlock()
|
|
|
|
// unmarshal state
|
|
state := &CreateArchiveTaskState{}
|
|
if err := json.Unmarshal([]byte(m.State()), state); err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to unmarshal state: %w", err)
|
|
}
|
|
m.state = state
|
|
|
|
// select node
|
|
node, err := allocateNode(ctx, dep, &m.state.NodeState, types.NodeCapabilityCreateArchive)
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to allocate node: %w", err)
|
|
}
|
|
m.node = node
|
|
|
|
next := task.StatusCompleted
|
|
|
|
if m.node.IsMaster() {
|
|
// Initialize temp folder
|
|
// Compress files
|
|
// Upload files to dst
|
|
switch m.state.Phase {
|
|
case CreateArchiveTaskPhaseNotStarted, "":
|
|
next, err = m.initializeTempFolder(ctx, dep)
|
|
case CreateArchiveTaskPhaseCompressFiles:
|
|
next, err = m.createArchiveFile(ctx, dep)
|
|
case CreateArchiveTaskPhaseUploadArchive:
|
|
next, err = m.uploadArchive(ctx, dep)
|
|
default:
|
|
next, err = task.StatusError, fmt.Errorf("unknown phase %q: %w", m.state.Phase, queue.CriticalErr)
|
|
}
|
|
} else {
|
|
// Listing all files and send to slave node for compressing
|
|
// Await compressing and send to slave for uploading
|
|
// Await uploading and complete upload
|
|
switch m.state.Phase {
|
|
case CreateArchiveTaskPhaseNotStarted, "":
|
|
next, err = m.listEntitiesAndSendToSlave(ctx, dep)
|
|
case CreateArchiveTaskPhaseAwaitSlaveCompressing:
|
|
next, err = m.awaitSlaveCompressing(ctx, dep)
|
|
case CreateArchiveTaskPhaseCreateAndAwaitSlaveUploading:
|
|
next, err = m.createAndAwaitSlaveUploading(ctx, dep)
|
|
case CreateArchiveTaskPhaseCompleteUpload:
|
|
next, err = m.completeUpload(ctx, dep)
|
|
default:
|
|
next, err = task.StatusError, fmt.Errorf("unknown phase %q: %w", m.state.Phase, queue.CriticalErr)
|
|
}
|
|
}
|
|
|
|
newStateStr, marshalErr := json.Marshal(m.state)
|
|
if marshalErr != nil {
|
|
return task.StatusError, fmt.Errorf("failed to marshal state: %w", marshalErr)
|
|
}
|
|
|
|
m.Lock()
|
|
m.Task.PrivateState = string(newStateStr)
|
|
m.Unlock()
|
|
return next, err
|
|
}
|
|
|
|
func (m *CreateArchiveTask) Cleanup(ctx context.Context) error {
|
|
if m.state.SlaveCompressState != nil && m.state.SlaveCompressState.TempPath != "" && m.node != nil {
|
|
if err := m.node.CleanupFolders(context.Background(), m.state.SlaveCompressState.TempPath); err != nil {
|
|
m.l.Warning("Failed to cleanup slave temp folder %s: %s", m.state.SlaveCompressState.TempPath, err)
|
|
}
|
|
}
|
|
|
|
if m.state.TempPath != "" {
|
|
time.Sleep(time.Duration(1) * time.Second)
|
|
return os.RemoveAll(m.state.TempPath)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *CreateArchiveTask) initializeTempFolder(ctx context.Context, dep dependency.Dep) (task.Status, error) {
|
|
tempPath, err := prepareTempFolder(ctx, dep, m)
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to prepare temp folder: %w", err)
|
|
}
|
|
|
|
m.state.TempPath = tempPath
|
|
m.state.Phase = CreateArchiveTaskPhaseCompressFiles
|
|
m.ResumeAfter(0)
|
|
return task.StatusSuspending, nil
|
|
}
|
|
|
|
func (m *CreateArchiveTask) listEntitiesAndSendToSlave(ctx context.Context, dep dependency.Dep) (task.Status, error) {
|
|
uris, err := fs.NewUriFromStrings(m.state.Uris...)
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to create uri from strings: %s (%w)", err, queue.CriticalErr)
|
|
}
|
|
|
|
payload := &SlaveCreateArchiveTaskState{
|
|
Entities: make([]SlaveCreateArchiveEntity, 0, len(uris)),
|
|
Policies: make(map[int]*ent.StoragePolicy),
|
|
}
|
|
|
|
user := inventory.UserFromContext(ctx)
|
|
fm := manager.NewFileManager(dep, user)
|
|
storagePolicyClient := dep.StoragePolicyClient()
|
|
|
|
failed, err := fm.CreateArchive(ctx, uris, io.Discard,
|
|
fs.WithDryRun(func(name string, e fs.Entity) {
|
|
payload.Entities = append(payload.Entities, SlaveCreateArchiveEntity{
|
|
Entity: e.Model(),
|
|
Path: name,
|
|
})
|
|
if _, ok := payload.Policies[e.PolicyID()]; !ok {
|
|
policy, err := storagePolicyClient.GetPolicyByID(ctx, e.PolicyID())
|
|
if err != nil {
|
|
m.l.Warning("Failed to get policy %d: %s", e.PolicyID(), err)
|
|
} else {
|
|
payload.Policies[e.PolicyID()] = policy
|
|
}
|
|
}
|
|
}),
|
|
fs.WithMaxArchiveSize(lo.Max(lo.Map(user.Edges.Groups, func(item *ent.Group, index int) int64 {
|
|
return item.Settings.CompressSize
|
|
}))),
|
|
)
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to compress files: %w", err)
|
|
}
|
|
|
|
m.state.Failed = failed
|
|
payloadStr, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to marshal payload: %w", err)
|
|
}
|
|
|
|
taskId, err := m.node.CreateTask(ctx, queue.SlaveCreateArchiveTaskType, string(payloadStr))
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to create slave task: %w", err)
|
|
}
|
|
|
|
m.state.Phase = CreateArchiveTaskPhaseAwaitSlaveCompressing
|
|
m.state.SlaveArchiveTaskID = taskId
|
|
m.ResumeAfter((10 * time.Second))
|
|
return task.StatusSuspending, nil
|
|
}
|
|
|
|
func (m *CreateArchiveTask) awaitSlaveCompressing(ctx context.Context, dep dependency.Dep) (task.Status, error) {
|
|
t, err := m.node.GetTask(ctx, m.state.SlaveArchiveTaskID, false)
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to get slave task: %w", err)
|
|
}
|
|
|
|
m.Lock()
|
|
m.state.NodeState.progress = t.Progress
|
|
m.Unlock()
|
|
|
|
m.state.SlaveCompressState = &SlaveCreateArchiveTaskState{}
|
|
if err := json.Unmarshal([]byte(t.PrivateState), m.state.SlaveCompressState); err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to unmarshal slave compress state: %s (%w)", err, queue.CriticalErr)
|
|
}
|
|
|
|
if t.Status == task.StatusError {
|
|
return task.StatusError, fmt.Errorf("slave task failed: %s (%w)", t.Error, queue.CriticalErr)
|
|
}
|
|
|
|
if t.Status == task.StatusCanceled {
|
|
return task.StatusError, fmt.Errorf("slave task canceled (%w)", queue.CriticalErr)
|
|
}
|
|
|
|
if t.Status == task.StatusCompleted {
|
|
m.state.Phase = CreateArchiveTaskPhaseCreateAndAwaitSlaveUploading
|
|
m.ResumeAfter(0)
|
|
return task.StatusSuspending, nil
|
|
}
|
|
|
|
m.l.Info("Slave task %d is still compressing, resume after 30s.", m.state.SlaveArchiveTaskID)
|
|
m.ResumeAfter((time.Second * 30))
|
|
return task.StatusSuspending, nil
|
|
}
|
|
|
|
func (m *CreateArchiveTask) createAndAwaitSlaveUploading(ctx context.Context, dep dependency.Dep) (task.Status, error) {
|
|
u := inventory.UserFromContext(ctx)
|
|
|
|
if m.state.SlaveUploadTaskID == 0 {
|
|
dst, err := fs.NewUriFromString(m.state.Dst)
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to parse dst uri %q: %s (%w)", m.state.Dst, err, queue.CriticalErr)
|
|
}
|
|
|
|
// Create slave upload task
|
|
payload := &SlaveUploadTaskState{
|
|
Files: []SlaveUploadEntity{
|
|
{
|
|
Size: m.state.SlaveCompressState.CompressedSize,
|
|
Uri: dst,
|
|
Src: m.state.SlaveCompressState.ZipFilePath,
|
|
},
|
|
},
|
|
MaxParallel: dep.SettingProvider().MaxParallelTransfer(ctx),
|
|
UserID: u.ID,
|
|
}
|
|
|
|
payloadStr, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to marshal payload: %w", err)
|
|
}
|
|
|
|
taskId, err := m.node.CreateTask(ctx, queue.SlaveUploadTaskType, string(payloadStr))
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to create slave task: %w", err)
|
|
}
|
|
|
|
m.state.NodeState.progress = nil
|
|
m.state.SlaveUploadTaskID = taskId
|
|
m.ResumeAfter(0)
|
|
return task.StatusSuspending, nil
|
|
}
|
|
|
|
m.l.Info("Checking slave upload task %d...", m.state.SlaveUploadTaskID)
|
|
t, err := m.node.GetTask(ctx, m.state.SlaveUploadTaskID, true)
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to get slave task: %w", err)
|
|
}
|
|
|
|
m.Lock()
|
|
m.state.NodeState.progress = t.Progress
|
|
m.Unlock()
|
|
|
|
if t.Status == task.StatusError {
|
|
return task.StatusError, fmt.Errorf("slave task failed: %s (%w)", t.Error, queue.CriticalErr)
|
|
}
|
|
|
|
if t.Status == task.StatusCanceled {
|
|
return task.StatusError, fmt.Errorf("slave task canceled (%w)", queue.CriticalErr)
|
|
}
|
|
|
|
if t.Status == task.StatusCompleted {
|
|
m.state.Phase = CreateArchiveTaskPhaseCompleteUpload
|
|
m.ResumeAfter(0)
|
|
return task.StatusSuspending, nil
|
|
}
|
|
|
|
m.l.Info("Slave task %d is still uploading, resume after 30s.", m.state.SlaveUploadTaskID)
|
|
m.ResumeAfter(time.Second * 30)
|
|
return task.StatusSuspending, nil
|
|
}
|
|
|
|
func (m *CreateArchiveTask) completeUpload(ctx context.Context, dep dependency.Dep) (task.Status, error) {
|
|
return task.StatusCompleted, nil
|
|
}
|
|
|
|
func (m *CreateArchiveTask) createArchiveFile(ctx context.Context, dep dependency.Dep) (task.Status, error) {
|
|
uris, err := fs.NewUriFromStrings(m.state.Uris...)
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to create uri from strings: %s (%w)", err, queue.CriticalErr)
|
|
}
|
|
|
|
user := inventory.UserFromContext(ctx)
|
|
fm := manager.NewFileManager(dep, user)
|
|
|
|
// Create temp zip file
|
|
fileName := fmt.Sprintf("%s.zip", uuid.Must(uuid.NewV4()))
|
|
zipFilePath := filepath.Join(
|
|
m.state.TempPath,
|
|
fileName,
|
|
)
|
|
zipFile, err := util.CreatNestedFile(zipFilePath)
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to create zip file: %w", err)
|
|
}
|
|
|
|
defer zipFile.Close()
|
|
|
|
// Start compressing
|
|
m.Lock()
|
|
m.progress[ProgressTypeArchiveCount] = &queue.Progress{}
|
|
m.progress[ProgressTypeArchiveSize] = &queue.Progress{}
|
|
m.Unlock()
|
|
failed, err := fm.CreateArchive(ctx, uris, zipFile,
|
|
fs.WithArchiveCompression(true),
|
|
fs.WithMaxArchiveSize(lo.Max(lo.Map(user.Edges.Groups, func(item *ent.Group, index int) int64 {
|
|
return item.Settings.CompressSize
|
|
}))),
|
|
fs.WithProgressFunc(func(current, diff int64, total int64) {
|
|
atomic.AddInt64(&m.progress[ProgressTypeArchiveSize].Current, diff)
|
|
atomic.AddInt64(&m.progress[ProgressTypeArchiveCount].Current, 1)
|
|
}),
|
|
)
|
|
if err != nil {
|
|
zipFile.Close()
|
|
_ = os.Remove(zipFilePath)
|
|
return task.StatusError, fmt.Errorf("failed to compress files: %w", err)
|
|
}
|
|
|
|
m.state.Failed = failed
|
|
m.Lock()
|
|
delete(m.progress, ProgressTypeArchiveSize)
|
|
delete(m.progress, ProgressTypeArchiveCount)
|
|
m.Unlock()
|
|
|
|
m.state.Phase = CreateArchiveTaskPhaseUploadArchive
|
|
m.state.ArchiveFile = fileName
|
|
m.ResumeAfter(0)
|
|
return task.StatusSuspending, nil
|
|
}
|
|
|
|
func (m *CreateArchiveTask) uploadArchive(ctx context.Context, dep dependency.Dep) (task.Status, error) {
|
|
fm := manager.NewFileManager(dep, inventory.UserFromContext(ctx))
|
|
zipFilePath := filepath.Join(
|
|
m.state.TempPath,
|
|
m.state.ArchiveFile,
|
|
)
|
|
|
|
m.l.Info("Uploading archive file %s to %s...", zipFilePath, m.state.Dst)
|
|
|
|
uri, err := fs.NewUriFromString(m.state.Dst)
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf(
|
|
"failed to parse dst uri %q: %s (%w)",
|
|
m.state.Dst,
|
|
err,
|
|
queue.CriticalErr,
|
|
)
|
|
}
|
|
|
|
file, err := os.Open(zipFilePath)
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to open compressed archive %q: %s", m.state.ArchiveFile, err)
|
|
}
|
|
defer file.Close()
|
|
fi, err := file.Stat()
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to get file info: %w", err)
|
|
}
|
|
size := fi.Size()
|
|
|
|
m.Lock()
|
|
m.progress[ProgressTypeUpload] = &queue.Progress{}
|
|
m.Unlock()
|
|
fileData := &fs.UploadRequest{
|
|
Props: &fs.UploadProps{
|
|
Uri: uri,
|
|
Size: size,
|
|
},
|
|
ProgressFunc: func(current, diff int64, total int64) {
|
|
atomic.StoreInt64(&m.progress[ProgressTypeUpload].Current, current)
|
|
atomic.StoreInt64(&m.progress[ProgressTypeUpload].Total, total)
|
|
},
|
|
File: file,
|
|
Seeker: file,
|
|
}
|
|
|
|
_, err = fm.Update(ctx, fileData)
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to upload archive file: %w", err)
|
|
}
|
|
|
|
return task.StatusCompleted, nil
|
|
}
|
|
|
|
func (m *CreateArchiveTask) Progress(ctx context.Context) queue.Progresses {
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
|
|
if m.state.NodeState.progress != nil {
|
|
merged := make(queue.Progresses)
|
|
for k, v := range m.progress {
|
|
merged[k] = v
|
|
}
|
|
|
|
for k, v := range m.state.NodeState.progress {
|
|
merged[k] = v
|
|
}
|
|
|
|
return merged
|
|
}
|
|
return m.progress
|
|
}
|
|
|
|
func (m *CreateArchiveTask) Summarize(hasher hashid.Encoder) *queue.Summary {
|
|
// unmarshal state
|
|
if m.state == nil {
|
|
if err := json.Unmarshal([]byte(m.State()), &m.state); err != nil {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
failed := m.state.Failed
|
|
if m.state.SlaveCompressState != nil {
|
|
failed = m.state.SlaveCompressState.Failed
|
|
}
|
|
|
|
return &queue.Summary{
|
|
NodeID: m.state.NodeID,
|
|
Phase: string(m.state.Phase),
|
|
Props: map[string]any{
|
|
SummaryKeySrcMultiple: m.state.Uris,
|
|
SummaryKeyDst: m.state.Dst,
|
|
SummaryKeyFailed: failed,
|
|
},
|
|
}
|
|
}
|
|
|
|
type (
|
|
SlaveCreateArchiveEntity struct {
|
|
Entity *ent.Entity `json:"entity"`
|
|
Path string `json:"path"`
|
|
}
|
|
SlaveCreateArchiveTaskState struct {
|
|
Entities []SlaveCreateArchiveEntity `json:"entities"`
|
|
Policies map[int]*ent.StoragePolicy `json:"policies"`
|
|
CompressedSize int64 `json:"compressed_size"`
|
|
TempPath string `json:"temp_path"`
|
|
ZipFilePath string `json:"zip_file_path"`
|
|
Failed int `json:"failed"`
|
|
}
|
|
SlaveCreateArchiveTask struct {
|
|
*queue.InMemoryTask
|
|
|
|
mu sync.RWMutex
|
|
progress queue.Progresses
|
|
l logging.Logger
|
|
state *SlaveCreateArchiveTaskState
|
|
}
|
|
)
|
|
|
|
// NewSlaveCreateArchiveTask creates a new SlaveCreateArchiveTask from raw private state
|
|
func NewSlaveCreateArchiveTask(ctx context.Context, props *types.SlaveTaskProps, id int, state string) queue.Task {
|
|
return &SlaveCreateArchiveTask{
|
|
InMemoryTask: &queue.InMemoryTask{
|
|
DBTask: &queue.DBTask{
|
|
Task: &ent.Task{
|
|
ID: id,
|
|
CorrelationID: logging.CorrelationID(ctx),
|
|
PublicState: &types.TaskPublicState{
|
|
SlaveTaskProps: props,
|
|
},
|
|
PrivateState: state,
|
|
},
|
|
},
|
|
},
|
|
|
|
progress: make(queue.Progresses),
|
|
}
|
|
}
|
|
|
|
func (t *SlaveCreateArchiveTask) Do(ctx context.Context) (task.Status, error) {
|
|
ctx = prepareSlaveTaskCtx(ctx, t.Model().PublicState.SlaveTaskProps)
|
|
dep := dependency.FromContext(ctx)
|
|
t.l = dep.Logger()
|
|
fm := manager.NewFileManager(dep, nil)
|
|
|
|
// unmarshal state
|
|
state := &SlaveCreateArchiveTaskState{}
|
|
if err := json.Unmarshal([]byte(t.State()), state); err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to unmarshal state: %w", err)
|
|
}
|
|
|
|
t.state = state
|
|
|
|
totalFiles := int64(0)
|
|
totalFileSize := int64(0)
|
|
for _, e := range t.state.Entities {
|
|
totalFiles++
|
|
totalFileSize += e.Entity.Size
|
|
}
|
|
|
|
t.Lock()
|
|
t.progress[ProgressTypeArchiveCount] = &queue.Progress{Total: totalFiles}
|
|
t.progress[ProgressTypeArchiveSize] = &queue.Progress{Total: totalFileSize}
|
|
t.Unlock()
|
|
|
|
// 3. Create temp workspace
|
|
tempPath, err := prepareTempFolder(ctx, dep, t)
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to prepare temp folder: %w", err)
|
|
}
|
|
t.state.TempPath = tempPath
|
|
|
|
// 2. Create archive file
|
|
fileName := fmt.Sprintf("%s.zip", uuid.Must(uuid.NewV4()))
|
|
zipFilePath := filepath.Join(
|
|
t.state.TempPath,
|
|
fileName,
|
|
)
|
|
zipFile, err := util.CreatNestedFile(zipFilePath)
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to create zip file: %w", err)
|
|
}
|
|
|
|
defer zipFile.Close()
|
|
|
|
zipWriter := zip.NewWriter(zipFile)
|
|
defer zipWriter.Close()
|
|
|
|
// 3. Download each entity and write into zip file
|
|
for _, e := range t.state.Entities {
|
|
policy, ok := t.state.Policies[e.Entity.StoragePolicyEntities]
|
|
if !ok {
|
|
state.Failed++
|
|
t.l.Warning("Policy not found for entity %d, skipping...", e.Entity.ID)
|
|
continue
|
|
}
|
|
|
|
entity := fs.NewEntity(e.Entity)
|
|
es, err := fm.GetEntitySource(ctx, 0,
|
|
fs.WithEntity(entity),
|
|
fs.WithPolicy(fm.CastStoragePolicyOnSlave(ctx, policy)),
|
|
)
|
|
if err != nil {
|
|
state.Failed++
|
|
t.l.Warning("Failed to get entity source for entity %d: %s, skipping...", e.Entity.ID, err)
|
|
continue
|
|
}
|
|
|
|
// Write to zip file
|
|
header := &zip.FileHeader{
|
|
Name: e.Path,
|
|
Modified: entity.UpdatedAt(),
|
|
UncompressedSize64: uint64(entity.Size()),
|
|
Method: zip.Deflate,
|
|
}
|
|
|
|
writer, err := zipWriter.CreateHeader(header)
|
|
if err != nil {
|
|
es.Close()
|
|
state.Failed++
|
|
t.l.Warning("Failed to create zip header for %s: %s, skipping...", e.Path, err)
|
|
continue
|
|
}
|
|
|
|
es.Apply(entitysource.WithContext(ctx))
|
|
_, err = io.Copy(writer, es)
|
|
es.Close()
|
|
if err != nil {
|
|
state.Failed++
|
|
t.l.Warning("Failed to write entity %d to zip file: %s, skipping...", e.Entity.ID, err)
|
|
}
|
|
|
|
atomic.AddInt64(&t.progress[ProgressTypeArchiveSize].Current, entity.Size())
|
|
atomic.AddInt64(&t.progress[ProgressTypeArchiveCount].Current, 1)
|
|
}
|
|
|
|
zipWriter.Close()
|
|
stat, err := zipFile.Stat()
|
|
if err != nil {
|
|
return task.StatusError, fmt.Errorf("failed to get compressed file info: %w", err)
|
|
}
|
|
|
|
t.state.CompressedSize = stat.Size()
|
|
t.state.ZipFilePath = zipFilePath
|
|
// Clear unused fields to save space
|
|
t.state.Entities = nil
|
|
t.state.Policies = nil
|
|
|
|
newStateStr, marshalErr := json.Marshal(t.state)
|
|
if marshalErr != nil {
|
|
return task.StatusError, fmt.Errorf("failed to marshal state: %w", marshalErr)
|
|
}
|
|
|
|
t.Lock()
|
|
t.Task.PrivateState = string(newStateStr)
|
|
t.Unlock()
|
|
return task.StatusCompleted, nil
|
|
}
|
|
|
|
func (m *SlaveCreateArchiveTask) Progress(ctx context.Context) queue.Progresses {
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
|
|
return m.progress
|
|
}
|