Merge branch 'master' into chore/conf/env_support

pull/1475/head
AHdark 3 years ago committed by GitHub
commit 77bb767f81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -7,10 +7,10 @@ jobs:
name: Build
runs-on: ubuntu-18.04
steps:
- name: Set up Go 1.17
- name: Set up Go 1.18
uses: actions/setup-go@v2
with:
go-version: "1.17"
go-version: "1.18"
id: go
- name: Check out code into the Go module directory

@ -12,10 +12,10 @@ jobs:
name: Test
runs-on: ubuntu-18.04
steps:
- name: Set up Go 1.17
- name: Set up Go 1.18
uses: actions/setup-go@v2
with:
go-version: "1.17"
go-version: "1.18"
id: go
- name: Check out code into the Go module directory

@ -1,6 +1,6 @@
language: go
go:
- 1.17.x
- 1.18.x
node_js: "12.16.3"
git:
depth: 1

@ -1,4 +1,4 @@
FROM golang:1.17-alpine as cloudreve_builder
FROM golang:1.18-alpine as cloudreve_builder
# install dependencies and build tools

@ -71,7 +71,7 @@ chmod +x ./cloudreve
## :gear: 构建
自行构建前需要拥有 `Go >= 1.17`、`node.js`、`yarn`、`zip` 等必要依赖。
自行构建前需要拥有 `Go >= 1.18`、`node.js`、`yarn`、`zip` 等必要依赖。
#### 克隆代码

@ -1 +1 @@
Subproject commit a1028e7e0ae96be4bb67d8c117cf39e07c207473
Subproject commit dc81a86ae88b2f64a26bfc34918a22cd0be3429e

Binary file not shown.

@ -0,0 +1,432 @@
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package embed provides access to files embedded in the running Go program.
//
// Go source files that import "embed" can use the //go:embed directive
// to initialize a variable of type string, []byte, or FS with the contents of
// files read from the package directory or subdirectories at compile time.
//
// For example, here are three ways to embed a file named hello.txt
// and then print its contents at run time.
//
// Embedding one file into a string:
//
// import _ "embed"
//
// //go:embed hello.txt
// var s string
// print(s)
//
// Embedding one file into a slice of bytes:
//
// import _ "embed"
//
// //go:embed hello.txt
// var b []byte
// print(string(b))
//
// Embedded one or more files into a file system:
//
// import "embed"
//
// //go:embed hello.txt
// var f embed.FS
// data, _ := f.ReadFile("hello.txt")
// print(string(data))
//
// # Directives
//
// A //go:embed directive above a variable declaration specifies which files to embed,
// using one or more path.Match patterns.
//
// The directive must immediately precede a line containing the declaration of a single variable.
// Only blank lines and // line comments are permitted between the directive and the declaration.
//
// The type of the variable must be a string type, or a slice of a byte type,
// or FS (or an alias of FS).
//
// For example:
//
// package server
//
// import "embed"
//
// // content holds our static web server content.
// //go:embed image/* template/*
// //go:embed html/index.html
// var content embed.FS
//
// The Go build system will recognize the directives and arrange for the declared variable
// (in the example above, content) to be populated with the matching files from the file system.
//
// The //go:embed directive accepts multiple space-separated patterns for
// brevity, but it can also be repeated, to avoid very long lines when there are
// many patterns. The patterns are interpreted relative to the package directory
// containing the source file. The path separator is a forward slash, even on
// Windows systems. Patterns may not contain . or .. or empty path elements,
// nor may they begin or end with a slash. To match everything in the current
// directory, use * instead of .. To allow for naming files with spaces in
// their names, patterns can be written as Go double-quoted or back-quoted
// string literals.
//
// If a pattern names a directory, all files in the subtree rooted at that directory are
// embedded (recursively), except that files with names beginning with . or _
// are excluded. So the variable in the above example is almost equivalent to:
//
// // content is our static web server content.
// //go:embed image template html/index.html
// var content embed.FS
//
// The difference is that image/* embeds image/.tempfile while image does not.
// Neither embeds image/dir/.tempfile.
//
// If a pattern begins with the prefix all:, then the rule for walking directories is changed
// to include those files beginning with . or _. For example, all:image embeds
// both image/.tempfile and image/dir/.tempfile.
//
// The //go:embed directive can be used with both exported and unexported variables,
// depending on whether the package wants to make the data available to other packages.
// It can only be used with variables at package scope, not with local variables.
//
// Patterns must not match files outside the package's module, such as .git/* or symbolic links.
// Patterns must not match files whose names include the special punctuation characters " * < > ? ` ' | / \ and :.
// Matches for empty directories are ignored. After that, each pattern in a //go:embed line
// must match at least one file or non-empty directory.
//
// If any patterns are invalid or have invalid matches, the build will fail.
//
// # Strings and Bytes
//
// The //go:embed line for a variable of type string or []byte can have only a single pattern,
// and that pattern can match only a single file. The string or []byte is initialized with
// the contents of that file.
//
// The //go:embed directive requires importing "embed", even when using a string or []byte.
// In source files that don't refer to embed.FS, use a blank import (import _ "embed").
//
// # File Systems
//
// For embedding a single file, a variable of type string or []byte is often best.
// The FS type enables embedding a tree of files, such as a directory of static
// web server content, as in the example above.
//
// FS implements the io/fs package's FS interface, so it can be used with any package that
// understands file systems, including net/http, text/template, and html/template.
//
// For example, given the content variable in the example above, we can write:
//
// http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.FS(content))))
//
// template.ParseFS(content, "*.tmpl")
//
// # Tools
//
// To support tools that analyze Go packages, the patterns found in //go:embed lines
// are available in “go list” output. See the EmbedPatterns, TestEmbedPatterns,
// and XTestEmbedPatterns fields in the “go help list” output.
package bootstrap
import (
"errors"
"io"
"io/fs"
"time"
)
// An FS is a read-only collection of files, usually initialized with a //go:embed directive.
// When declared without a //go:embed directive, an FS is an empty file system.
//
// An FS is a read-only value, so it is safe to use from multiple goroutines
// simultaneously and also safe to assign values of type FS to each other.
//
// FS implements fs.FS, so it can be used with any package that understands
// file system interfaces, including net/http, text/template, and html/template.
//
// See the package documentation for more details about initializing an FS.
type FS struct {
// The compiler knows the layout of this struct.
// See cmd/compile/internal/staticdata's WriteEmbed.
//
// The files list is sorted by name but not by simple string comparison.
// Instead, each file's name takes the form "dir/elem" or "dir/elem/".
// The optional trailing slash indicates that the file is itself a directory.
// The files list is sorted first by dir (if dir is missing, it is taken to be ".")
// and then by base, so this list of files:
//
// p
// q/
// q/r
// q/s/
// q/s/t
// q/s/u
// q/v
// w
//
// is actually sorted as:
//
// p # dir=. elem=p
// q/ # dir=. elem=q
// w/ # dir=. elem=w
// q/r # dir=q elem=r
// q/s/ # dir=q elem=s
// q/v # dir=q elem=v
// q/s/t # dir=q/s elem=t
// q/s/u # dir=q/s elem=u
//
// This order brings directory contents together in contiguous sections
// of the list, allowing a directory read to use binary search to find
// the relevant sequence of entries.
files *[]file
}
// split splits the name into dir and elem as described in the
// comment in the FS struct above. isDir reports whether the
// final trailing slash was present, indicating that name is a directory.
func split(name string) (dir, elem string, isDir bool) {
if name[len(name)-1] == '/' {
isDir = true
name = name[:len(name)-1]
}
i := len(name) - 1
for i >= 0 && name[i] != '/' {
i--
}
if i < 0 {
return ".", name, isDir
}
return name[:i], name[i+1:], isDir
}
// trimSlash trims a trailing slash from name, if present,
// returning the possibly shortened name.
func trimSlash(name string) string {
if len(name) > 0 && name[len(name)-1] == '/' {
return name[:len(name)-1]
}
return name
}
var (
_ fs.ReadDirFS = FS{}
_ fs.ReadFileFS = FS{}
)
// A file is a single file in the FS.
// It implements fs.FileInfo and fs.DirEntry.
type file struct {
// The compiler knows the layout of this struct.
// See cmd/compile/internal/staticdata's WriteEmbed.
name string
data string
hash [16]byte // truncated SHA256 hash
}
var (
_ fs.FileInfo = (*file)(nil)
_ fs.DirEntry = (*file)(nil)
)
func (f *file) Name() string { _, elem, _ := split(f.name); return elem }
func (f *file) Size() int64 { return int64(len(f.data)) }
func (f *file) ModTime() time.Time { return time.Time{} }
func (f *file) IsDir() bool { _, _, isDir := split(f.name); return isDir }
func (f *file) Sys() any { return nil }
func (f *file) Type() fs.FileMode { return f.Mode().Type() }
func (f *file) Info() (fs.FileInfo, error) { return f, nil }
func (f *file) Mode() fs.FileMode {
if f.IsDir() {
return fs.ModeDir | 0555
}
return 0444
}
// dotFile is a file for the root directory,
// which is omitted from the files list in a FS.
var dotFile = &file{name: "./"}
// lookup returns the named file, or nil if it is not present.
func (f FS) lookup(name string) *file {
if !fs.ValidPath(name) {
// The compiler should never emit a file with an invalid name,
// so this check is not strictly necessary (if name is invalid,
// we shouldn't find a match below), but it's a good backstop anyway.
return nil
}
if name == "." {
return dotFile
}
if f.files == nil {
return nil
}
// Binary search to find where name would be in the list,
// and then check if name is at that position.
dir, elem, _ := split(name)
files := *f.files
i := sortSearch(len(files), func(i int) bool {
idir, ielem, _ := split(files[i].name)
return idir > dir || idir == dir && ielem >= elem
})
if i < len(files) && trimSlash(files[i].name) == name {
return &files[i]
}
return nil
}
// readDir returns the list of files corresponding to the directory dir.
func (f FS) readDir(dir string) []file {
if f.files == nil {
return nil
}
// Binary search to find where dir starts and ends in the list
// and then return that slice of the list.
files := *f.files
i := sortSearch(len(files), func(i int) bool {
idir, _, _ := split(files[i].name)
return idir >= dir
})
j := sortSearch(len(files), func(j int) bool {
jdir, _, _ := split(files[j].name)
return jdir > dir
})
return files[i:j]
}
// Open opens the named file for reading and returns it as an fs.File.
//
// The returned file implements io.Seeker when the file is not a directory.
func (f FS) Open(name string) (fs.File, error) {
file := f.lookup(name)
if file == nil {
return nil, &fs.PathError{Op: "open", Path: name, Err: fs.ErrNotExist}
}
if file.IsDir() {
return &openDir{file, f.readDir(name), 0}, nil
}
return &openFile{file, 0}, nil
}
// ReadDir reads and returns the entire named directory.
func (f FS) ReadDir(name string) ([]fs.DirEntry, error) {
file, err := f.Open(name)
if err != nil {
return nil, err
}
dir, ok := file.(*openDir)
if !ok {
return nil, &fs.PathError{Op: "read", Path: name, Err: errors.New("not a directory")}
}
list := make([]fs.DirEntry, len(dir.files))
for i := range list {
list[i] = &dir.files[i]
}
return list, nil
}
// ReadFile reads and returns the content of the named file.
func (f FS) ReadFile(name string) ([]byte, error) {
file, err := f.Open(name)
if err != nil {
return nil, err
}
ofile, ok := file.(*openFile)
if !ok {
return nil, &fs.PathError{Op: "read", Path: name, Err: errors.New("is a directory")}
}
return []byte(ofile.f.data), nil
}
// An openFile is a regular file open for reading.
type openFile struct {
f *file // the file itself
offset int64 // current read offset
}
var (
_ io.Seeker = (*openFile)(nil)
)
func (f *openFile) Close() error { return nil }
func (f *openFile) Stat() (fs.FileInfo, error) { return f.f, nil }
func (f *openFile) Read(b []byte) (int, error) {
if f.offset >= int64(len(f.f.data)) {
return 0, io.EOF
}
if f.offset < 0 {
return 0, &fs.PathError{Op: "read", Path: f.f.name, Err: fs.ErrInvalid}
}
n := copy(b, f.f.data[f.offset:])
f.offset += int64(n)
return n, nil
}
func (f *openFile) Seek(offset int64, whence int) (int64, error) {
switch whence {
case 0:
// offset += 0
case 1:
offset += f.offset
case 2:
offset += int64(len(f.f.data))
}
if offset < 0 || offset > int64(len(f.f.data)) {
return 0, &fs.PathError{Op: "seek", Path: f.f.name, Err: fs.ErrInvalid}
}
f.offset = offset
return offset, nil
}
// An openDir is a directory open for reading.
type openDir struct {
f *file // the directory file itself
files []file // the directory contents
offset int // the read offset, an index into the files slice
}
func (d *openDir) Close() error { return nil }
func (d *openDir) Stat() (fs.FileInfo, error) { return d.f, nil }
func (d *openDir) Read([]byte) (int, error) {
return 0, &fs.PathError{Op: "read", Path: d.f.name, Err: errors.New("is a directory")}
}
func (d *openDir) ReadDir(count int) ([]fs.DirEntry, error) {
n := len(d.files) - d.offset
if n == 0 {
if count <= 0 {
return nil, nil
}
return nil, io.EOF
}
if count > 0 && n > count {
n = count
}
list := make([]fs.DirEntry, n)
for i := range list {
list[i] = &d.files[d.offset+i]
}
d.offset += n
return list, nil
}
// sortSearch is like sort.Search, avoiding an import.
func sortSearch(n int, f func(int) bool) int {
// Define f(-1) == false and f(n) == true.
// Invariant: f(i-1) == false, f(j) == true.
i, j := 0, n
for i < j {
h := int(uint(i+j) >> 1) // avoid overflow when computing h
// i ≤ h < j
if !f(h) {
i = h + 1 // preserves f(i-1) == false
} else {
j = h // preserves f(j) == true
}
}
// i == j, f(i-1) == false, and f(j) (= f(i)) == true => answer is i.
return i
}

@ -0,0 +1,75 @@
package bootstrap
import (
"archive/zip"
"crypto/sha256"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
"github.com/pkg/errors"
"io"
"io/fs"
"sort"
"strings"
)
func NewFS(zipContent string) fs.FS {
zipReader, err := zip.NewReader(strings.NewReader(zipContent), int64(len(zipContent)))
if err != nil {
util.Log().Panic("Static resource is not a valid zip file: %s", err)
}
var files []file
err = fs.WalkDir(zipReader, ".", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return errors.Errorf("无法获取[%s]的信息, %s, 跳过...", path, err)
}
if path == "." {
return nil
}
var f file
if d.IsDir() {
f.name = path + "/"
} else {
f.name = path
rc, err := zipReader.Open(path)
if err != nil {
return errors.Errorf("无法打开文件[%s], %s, 跳过...", path, err)
}
defer rc.Close()
data, err := io.ReadAll(rc)
if err != nil {
return errors.Errorf("无法读取文件[%s], %s, 跳过...", path, err)
}
f.data = string(data)
hash := sha256.Sum256(data)
for i := range f.hash {
f.hash[i] = ^hash[i]
}
}
files = append(files, f)
return nil
})
if err != nil {
util.Log().Panic("初始化静态资源失败: %s", err)
}
sort.Slice(files, func(i, j int) bool {
fi, fj := files[i], files[j]
di, ei, _ := split(fi.name)
dj, ej, _ := split(fj.name)
if di != dj {
return di < dj
}
return ei < ej
})
var embedFS FS
embedFS.files = &files
return embedFS
}

@ -10,9 +10,9 @@ func RunScript(name string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := invoker.RunDBScript(name, ctx); err != nil {
util.Log().Error("数据库脚本执行失败: %s", err)
util.Log().Error("Failed to execute database script: %s", err)
return
}
util.Log().Info("数据库脚本 [%s] 执行完毕", name)
util.Log().Info("Finish executing database script %q.", name)
}

@ -46,13 +46,13 @@ func (b *GinFS) Exists(prefix string, filepath string) bool {
// InitStatic 初始化静态资源文件
func InitStatic(statics fs.FS) {
if util.Exists(util.RelativePath(StaticFolder)) {
util.Log().Info("检测到 statics 目录存在,将使用此目录下的静态资源文件")
util.Log().Info("Folder with name \"statics\" already exists, it will be used to serve static files.")
StaticFS = static.LocalFile(util.RelativePath("statics"), false)
} else {
// 初始化静态资源
embedFS, err := fs.Sub(statics, "assets/build")
if err != nil {
util.Log().Panic("无法初始化静态资源, %s", err)
util.Log().Panic("Failed to initialize static resources: %s", err)
}
StaticFS = &GinFS{
@ -62,19 +62,19 @@ func InitStatic(statics fs.FS) {
// 检查静态资源的版本
f, err := StaticFS.Open("version.json")
if err != nil {
util.Log().Warning("静态资源版本标识文件不存在,请重新构建或删除 statics 目录")
util.Log().Warning("Missing version identifier file in static resources, please delete \"statics\" folder and rebuild it.")
return
}
b, err := io.ReadAll(f)
if err != nil {
util.Log().Warning("无法读取静态资源文件版本,请重新构建或删除 statics 目录")
util.Log().Warning("Failed to read version identifier file in static resources, please delete \"statics\" folder and rebuild it.")
return
}
var v staticVersion
if err := json.Unmarshal(b, &v); err != nil {
util.Log().Warning("无法解析静态资源文件版本, %s", err)
util.Log().Warning("Failed to parse version identifier file in static resources: %s", err)
return
}
@ -84,12 +84,12 @@ func InitStatic(statics fs.FS) {
}
if v.Name != staticName {
util.Log().Warning("静态资源版本不匹配,请重新构建或删除 statics 目录")
util.Log().Warning("Static resource version mismatch, please delete \"statics\" folder and rebuild it.")
return
}
if v.Version != conf.RequiredStaticVersion {
util.Log().Warning("静态资源版本不匹配 [当前 %s, 需要: %s],请重新构建或删除 statics 目录", v.Version, conf.RequiredStaticVersion)
util.Log().Warning("Static resource version mismatch [Current %s, Desired: %s]please delete \"statics\" folder and rebuild it.", v.Version, conf.RequiredStaticVersion)
return
}
}
@ -99,13 +99,13 @@ func Eject(statics fs.FS) {
// 初始化静态资源
embedFS, err := fs.Sub(statics, "assets/build")
if err != nil {
util.Log().Panic("无法初始化静态资源, %s", err)
util.Log().Panic("Failed to initialize static resources: %s", err)
}
var walk func(relPath string, d fs.DirEntry, err error) error
walk = func(relPath string, d fs.DirEntry, err error) error {
if err != nil {
return errors.Errorf("无法获取[%s]的信息, %s, 跳过...", relPath, err)
return errors.Errorf("Failed to read info of %q: %s, skipping...", relPath, err)
}
if !d.IsDir() {
@ -114,13 +114,13 @@ func Eject(statics fs.FS) {
defer out.Close()
if err != nil {
return errors.Errorf("无法创建文件[%s], %s, 跳过...", relPath, err)
return errors.Errorf("Failed to create file %q: %s, skipping...", relPath, err)
}
util.Log().Info("导出 [%s]...", relPath)
util.Log().Info("Ejecting %q...", relPath)
obj, _ := embedFS.Open(relPath)
if _, err := io.Copy(out, bufio.NewReader(obj)); err != nil {
return errors.Errorf("无法写入文件[%s], %s, 跳过...", relPath, err)
return errors.Errorf("Cannot write file %q: %s, skipping...", relPath, err)
}
}
return nil
@ -129,8 +129,8 @@ func Eject(statics fs.FS) {
// util.Log().Info("开始导出内置静态资源...")
err = fs.WalkDir(embedFS, ".", walk)
if err != nil {
util.Log().Error("导出内置静态资源遇到错误:%s", err)
util.Log().Error("Error occurs while ejecting static resources: %s", err)
return
}
util.Log().Info("内置静态资源导出完成")
util.Log().Info("Finish ejecting static resources.")
}

@ -1,6 +1,6 @@
module github.com/cloudreve/Cloudreve/v3
go 1.17
go 1.18
require (
github.com/DATA-DOG/go-sqlmock v1.3.3
@ -99,6 +99,7 @@ require (
github.com/mattn/go-colorable v0.1.4 // indirect
github.com/mattn/go-isatty v0.0.12 // indirect
github.com/mattn/go-runewidth v0.0.12 // indirect
github.com/mattn/go-sqlite3 v1.14.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.1.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect

@ -4,13 +4,11 @@ import (
"context"
_ "embed"
"flag"
"io"
"io/fs"
"net"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
@ -19,8 +17,6 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/conf"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
"github.com/cloudreve/Cloudreve/v3/routers"
"github.com/mholt/archiver/v4"
)
var (
@ -36,17 +32,14 @@ var staticZip string
var staticFS fs.FS
func init() {
flag.StringVar(&confPath, "c", util.RelativePath("conf.ini"), "配置文件路径")
flag.BoolVar(&isEject, "eject", false, "导出内置静态资源")
flag.StringVar(&scriptName, "database-script", "", "运行内置数据库助手脚本")
flag.BoolVar(&skipConfLoading, "skip-conf", false, "跳过配置文件加载")
flag.StringVar(&confPath, "c", util.RelativePath("conf.ini"), "Path to the config file.")
flag.BoolVar(&isEject, "eject", false, "Eject all embedded static files.")
flag.StringVar(&scriptName, "database-script", "", "Name of database util script.")
flag.BoolVar(&skipConfLoading, "skip-conf", false, "Skip conf file loading")
flag.Parse()
staticFS = archiver.ArchiveFS{
Stream: io.NewSectionReader(strings.NewReader(staticZip), 0, int64(len(staticZip))),
Format: archiver.Zip{},
}
bootstrap.Init(confPath, skipConfLoading, staticFS)
staticFS = bootstrap.NewFS(staticZip)
bootstrap.Init(confPath, staticFS)
}
func main() {
@ -73,7 +66,7 @@ func main() {
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)
go func() {
sig := <-sigChan
util.Log().Info("收到信号 %s开始关闭 server", sig)
util.Log().Info("Signal %s received, shutting down server...", sig)
ctx := context.Background()
if conf.SystemConfig.GracePeriod != 0 {
var cancel context.CancelFunc
@ -83,16 +76,16 @@ func main() {
err := server.Shutdown(ctx)
if err != nil {
util.Log().Error("关闭 server 错误, %s", err)
util.Log().Error("Failed to shutdown server: %s", err)
}
}()
// 如果启用了SSL
if conf.SSLConfig.CertPath != "" {
util.Log().Info("开始监听 %s", conf.SSLConfig.Listen)
util.Log().Info("Listening to %q", conf.SSLConfig.Listen)
server.Addr = conf.SSLConfig.Listen
if err := server.ListenAndServeTLS(conf.SSLConfig.CertPath, conf.SSLConfig.KeyPath); err != nil {
util.Log().Error("无法监听[%s]%s", conf.SSLConfig.Listen, err)
util.Log().Error("Failed to listen to %q: %s", conf.SSLConfig.Listen, err)
return
}
}
@ -102,23 +95,23 @@ func main() {
// delete socket file before listening
if _, err := os.Stat(conf.UnixConfig.Listen); err == nil {
if err = os.Remove(conf.UnixConfig.Listen); err != nil {
util.Log().Error("删除 socket 文件错误, %s", err)
util.Log().Error("Failed to delete socket file: %s", err)
return
}
}
api.TrustedPlatform = conf.UnixConfig.ProxyHeader
util.Log().Info("开始监听 %s", conf.UnixConfig.Listen)
util.Log().Info("Listening to %q", conf.UnixConfig.Listen)
if err := RunUnix(server); err != nil {
util.Log().Error("无法监听[%s]%s", conf.UnixConfig.Listen, err)
util.Log().Error("Failed to listen to %q: %s", conf.UnixConfig.Listen, err)
}
return
}
util.Log().Info("开始监听 %s", conf.SystemConfig.Listen)
util.Log().Info("Listening to %q", conf.SystemConfig.Listen)
server.Addr = conf.SystemConfig.Listen
if err := server.ListenAndServe(); err != nil {
util.Log().Error("无法监听[%s]%s", conf.SystemConfig.Listen, err)
util.Log().Error("Failed to listen to %q: %s", conf.SystemConfig.Listen, err)
}
}

@ -142,18 +142,18 @@ func uploadCallbackCheck(c *gin.Context, policyType string) serializer.Response
// 验证 Callback Key
sessionID := c.Param("sessionID")
if sessionID == "" {
return serializer.ParamErr("Session ID 不能为空", nil)
return serializer.ParamErr("Session ID cannot be empty", nil)
}
callbackSessionRaw, exist := cache.Get(filesystem.UploadSessionCachePrefix + sessionID)
if !exist {
return serializer.ParamErr("上传会话不存在或已过期", nil)
return serializer.Err(serializer.CodeUploadSessionExpired, "上传会话不存在或已过期", nil)
}
callbackSession := callbackSessionRaw.(serializer.UploadSession)
c.Set(filesystem.UploadSessionCtx, &callbackSession)
if callbackSession.Policy.Type != policyType {
return serializer.Err(serializer.CodePolicyNotAllowed, "Policy not supported", nil)
return serializer.Err(serializer.CodePolicyNotAllowed, "", nil)
}
// 清理回调会话
@ -162,7 +162,7 @@ func uploadCallbackCheck(c *gin.Context, policyType string) serializer.Response
// 查找用户
user, err := model.GetActiveUserByID(callbackSession.UID)
if err != nil {
return serializer.Err(serializer.CodeCheckLogin, "找不到用户", err)
return serializer.Err(serializer.CodeUserNotFound, "", err)
}
c.Set(filesystem.UserCtx, &user)
return serializer.Response{}
@ -194,14 +194,14 @@ func QiniuCallbackAuth() gin.HandlerFunc {
mac := qbox.NewMac(session.Policy.AccessKey, session.Policy.SecretKey)
ok, err := mac.VerifyCallback(c.Request)
if err != nil {
util.Log().Debug("无法验证回调请求,%s", err)
c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: "无法验证回调请求"})
util.Log().Debug("Failed to verify callback request: %s", err)
c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: "Failed to verify callback request."})
c.Abort()
return
}
if !ok {
c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: "回调签名无效"})
c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: "Invalid signature."})
c.Abort()
return
}
@ -215,8 +215,8 @@ func OSSCallbackAuth() gin.HandlerFunc {
return func(c *gin.Context) {
err := oss.VerifyCallbackSignature(c.Request)
if err != nil {
util.Log().Debug("回调签名验证失败,%s", err)
c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: "回调签名验证失败"})
util.Log().Debug("Failed to verify callback request: %s", err)
c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: "Failed to verify callback request."})
c.Abort()
return
}
@ -250,7 +250,7 @@ func UpyunCallbackAuth() gin.HandlerFunc {
// 计算正文MD5
actualContentMD5 := fmt.Sprintf("%x", md5.Sum(body))
if actualContentMD5 != contentMD5 {
c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: "MD5不一致"})
c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: "MD5 mismatch."})
c.Abort()
return
}
@ -265,7 +265,7 @@ func UpyunCallbackAuth() gin.HandlerFunc {
// 对比签名
if signature != actualSignature {
c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: "鉴权失败"})
c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: "Signature not match"})
c.Abort()
return
}
@ -289,7 +289,7 @@ func IsAdmin() gin.HandlerFunc {
return func(c *gin.Context) {
user, _ := c.Get("user")
if user.(*model.User).Group.ID != 1 && user.(*model.User).ID != 1 {
c.JSON(200, serializer.Err(serializer.CodeAdminRequired, "您不是管理组成员", nil))
c.JSON(200, serializer.Err(serializer.CodeNoPermissionErr, "", nil))
c.Abort()
return
}

@ -25,7 +25,7 @@ func UseSlaveAria2Instance(clusterController cluster.Controller) gin.HandlerFunc
// 获取对应主机节点的从机Aria2实例
caller, err := clusterController.GetAria2Instance(siteID.(string))
if err != nil {
c.JSON(200, serializer.Err(serializer.CodeNotSet, "无法获取 Aria2 实例", err))
c.JSON(200, serializer.Err(serializer.CodeNotSet, "Failed to get Aria2 instance", err))
c.Abort()
return
}
@ -35,7 +35,7 @@ func UseSlaveAria2Instance(clusterController cluster.Controller) gin.HandlerFunc
return
}
c.JSON(200, serializer.ParamErr("未知的主机节点ID", nil))
c.JSON(200, serializer.ParamErr("Unknown master node ID", nil))
c.Abort()
}
}
@ -44,14 +44,14 @@ func SlaveRPCSignRequired(nodePool cluster.Pool) gin.HandlerFunc {
return func(c *gin.Context) {
nodeID, err := strconv.ParseUint(c.GetHeader(auth.CrHeaderPrefix+"Node-Id"), 10, 64)
if err != nil {
c.JSON(200, serializer.ParamErr("未知的主机节点ID", err))
c.JSON(200, serializer.ParamErr("Unknown master node ID", err))
c.Abort()
return
}
slaveNode := nodePool.GetNodeByID(uint(nodeID))
if slaveNode == nil {
c.JSON(200, serializer.ParamErr("未知的主机节点ID", err))
c.JSON(200, serializer.ParamErr("Unknown master node ID", err))
c.Abort()
return
}

@ -17,7 +17,7 @@ func HashID(IDType int) gin.HandlerFunc {
c.Next()
return
}
c.JSON(200, serializer.ParamErr("无法解析对象ID", nil))
c.JSON(200, serializer.ParamErr("Failed to parse object ID", nil))
c.Abort()
return

@ -0,0 +1,30 @@
package middleware
import (
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/gin-gonic/gin"
)
// ValidateSourceLink validates if the perm source link is a valid redirect link
func ValidateSourceLink() gin.HandlerFunc {
return func(c *gin.Context) {
linkID, ok := c.Get("object_id")
if !ok {
c.JSON(200, serializer.Err(serializer.CodeFileNotFound, "", nil))
c.Abort()
return
}
sourceLink, err := model.GetSourceLinkByID(linkID)
if err != nil || sourceLink.File.ID == 0 || sourceLink.File.Name != c.Param("name") {
c.JSON(200, serializer.Err(serializer.CodeFileNotFound, "", nil))
c.Abort()
return
}
sourceLink.Downloaded()
c.Set("source_link", sourceLink)
c.Next()
}
}

@ -0,0 +1,57 @@
package middleware
import (
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/assert"
"net/http/httptest"
"testing"
)
func TestValidateSourceLink(t *testing.T) {
a := assert.New(t)
rec := httptest.NewRecorder()
testFunc := ValidateSourceLink()
// ID 不存在
{
c, _ := gin.CreateTestContext(rec)
testFunc(c)
a.True(c.IsAborted())
}
// SourceLink 不存在
{
c, _ := gin.CreateTestContext(rec)
c.Set("object_id", 1)
mock.ExpectQuery("SELECT(.+)source_links(.+)").WithArgs(1).WillReturnRows(sqlmock.NewRows([]string{"id"}))
testFunc(c)
a.True(c.IsAborted())
a.NoError(mock.ExpectationsWereMet())
}
// 原文件不存在
{
c, _ := gin.CreateTestContext(rec)
c.Set("object_id", 1)
mock.ExpectQuery("SELECT(.+)source_links(.+)").WithArgs(1).WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(1))
mock.ExpectQuery("SELECT(.+)files(.+)").WithArgs(0).WillReturnRows(sqlmock.NewRows([]string{"id"}))
testFunc(c)
a.True(c.IsAborted())
a.NoError(mock.ExpectationsWereMet())
}
// 成功
{
c, _ := gin.CreateTestContext(rec)
c.Set("object_id", 1)
mock.ExpectQuery("SELECT(.+)source_links(.+)").WithArgs(1).WillReturnRows(sqlmock.NewRows([]string{"id", "file_id"}).AddRow(1, 2))
mock.ExpectQuery("SELECT(.+)files(.+)").WithArgs(2).WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(2))
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)source_links").WillReturnResult(sqlmock.NewResult(1, 1))
testFunc(c)
a.False(c.IsAborted())
a.NoError(mock.ExpectationsWereMet())
}
}

@ -23,13 +23,13 @@ func FrontendFileHandler() gin.HandlerFunc {
// 读取index.html
file, err := bootstrap.StaticFS.Open("/index.html")
if err != nil {
util.Log().Warning("静态文件[index.html]不存在,可能会影响首页展示")
util.Log().Warning("Static file \"index.html\" does not exist, it might affect the display of the homepage.")
return ignoreFunc
}
fileContentBytes, err := ioutil.ReadAll(file)
if err != nil {
util.Log().Warning("静态文件[index.html]读取失败,可能会影响首页展示")
util.Log().Warning("Cannot read static file \"index.html\", it might affect the display of the homepage.")
return ignoreFunc
}
fileContent := string(fileContentBytes)
@ -39,7 +39,11 @@ func FrontendFileHandler() gin.HandlerFunc {
path := c.Request.URL.Path
// API 跳过
if strings.HasPrefix(path, "/api") || strings.HasPrefix(path, "/custom") || strings.HasPrefix(path, "/dav") || path == "/manifest.json" {
if strings.HasPrefix(path, "/api") ||
strings.HasPrefix(path, "/custom") ||
strings.HasPrefix(path, "/dav") ||
strings.HasPrefix(path, "/f") ||
path == "/manifest.json" {
c.Next()
return
}

@ -22,10 +22,10 @@ func Session(secret string) gin.HandlerFunc {
var err error
Store, err = redis.NewStoreWithDB(10, conf.RedisConfig.Network, conf.RedisConfig.Server, conf.RedisConfig.Password, strconv.Itoa(conf.RedisConfig.DB), []byte(secret))
if err != nil {
util.Log().Panic("无法连接到 Redis%s", err)
util.Log().Panic("Failed to connect to Redis%s", err)
}
util.Log().Info("已连接到 Redis 服务器:%s", conf.RedisConfig.Server)
util.Log().Info("Connect to Redis server %q.", conf.RedisConfig.Server)
} else {
Store = memstore.NewStore([]byte(secret))
}
@ -52,7 +52,7 @@ func CSRFCheck() gin.HandlerFunc {
return
}
c.JSON(200, serializer.Err(serializer.CodeNoPermissionErr, "来源非法", nil))
c.JSON(200, serializer.Err(serializer.CodeNoPermissionErr, "Invalid origin", nil))
c.Abort()
}
}

@ -16,14 +16,14 @@ func ShareOwner() gin.HandlerFunc {
if userCtx, ok := c.Get("user"); ok {
user = userCtx.(*model.User)
} else {
c.JSON(200, serializer.Err(serializer.CodeCheckLogin, "请先登录", nil))
c.JSON(200, serializer.Err(serializer.CodeCheckLogin, "", nil))
c.Abort()
return
}
if share, ok := c.Get("share"); ok {
if share.(*model.Share).Creator().ID != user.ID {
c.JSON(200, serializer.Err(serializer.CodeNotFound, "分享不存在", nil))
c.JSON(200, serializer.Err(serializer.CodeShareLinkNotFound, "", nil))
c.Abort()
return
}
@ -46,7 +46,7 @@ func ShareAvailable() gin.HandlerFunc {
share := model.GetShareByHashID(c.Param("id"))
if share == nil || !share.IsAvailable() {
c.JSON(200, serializer.Err(serializer.CodeNotFound, "分享不存在或已失效", nil))
c.JSON(200, serializer.Err(serializer.CodeShareLinkNotFound, "", nil))
c.Abort()
return
}
@ -65,7 +65,7 @@ func ShareCanPreview() gin.HandlerFunc {
c.Next()
return
}
c.JSON(200, serializer.Err(serializer.CodeNoPermissionErr, "此分享无法预览",
c.JSON(200, serializer.Err(serializer.CodeDisabledSharePreview, "",
nil))
c.Abort()
return
@ -85,7 +85,7 @@ func CheckShareUnlocked() gin.HandlerFunc {
unlocked := util.GetSession(c, sessionKey) != nil
if !unlocked {
c.JSON(200, serializer.Err(serializer.CodeNoPermissionErr,
"无权访问此分享", nil))
"", nil))
c.Abort()
return
}
@ -109,7 +109,7 @@ func BeforeShareDownload() gin.HandlerFunc {
// 检查用户是否可以下载此分享的文件
err := share.CanBeDownloadBy(user)
if err != nil {
c.JSON(200, serializer.Err(serializer.CodeNoPermissionErr, err.Error(),
c.JSON(200, serializer.Err(serializer.CodeGroupNotAllowed, err.Error(),
nil))
c.Abort()
return
@ -118,7 +118,7 @@ func BeforeShareDownload() gin.HandlerFunc {
// 对积分、下载次数进行更新
err = share.DownloadBy(user, c)
if err != nil {
c.JSON(200, serializer.Err(serializer.CodeNoPermissionErr, err.Error(),
c.JSON(200, serializer.Err(serializer.CodeGroupNotAllowed, err.Error(),
nil))
c.Abort()
return

@ -11,9 +11,9 @@ var defaultSettings = []Setting{
{Name: "siteName", Value: `Cloudreve`, Type: "basic"},
{Name: "register_enabled", Value: `1`, Type: "register"},
{Name: "default_group", Value: `2`, Type: "register"},
{Name: "siteKeywords", Value: `网盘,网盘`, Type: "basic"},
{Name: "siteKeywords", Value: `Cloudreve, cloud storage`, Type: "basic"},
{Name: "siteDes", Value: `Cloudreve`, Type: "basic"},
{Name: "siteTitle", Value: `平步云端`, Type: "basic"},
{Name: "siteTitle", Value: `Inclusive cloud storage for everyone`, Type: "basic"},
{Name: "siteScript", Value: ``, Type: "basic"},
{Name: "siteID", Value: uuid.Must(uuid.NewV4()).String(), Type: "basic"},
{Name: "fromName", Value: `Cloudreve`, Type: "mail"},

@ -32,6 +32,7 @@ type Download struct {
// 数据库忽略字段
StatusInfo rpc.StatusInfo `gorm:"-"`
Task *Task `gorm:"-"`
NodeName string `gorm:"-"`
}
// AfterFind 找到下载任务后的钩子处理Status结构
@ -60,7 +61,7 @@ func (task *Download) BeforeSave() (err error) {
// Create 创建离线下载记录
func (task *Download) Create() (uint, error) {
if err := DB.Create(task).Error; err != nil {
util.Log().Warning("无法插入离线下载记录, %s", err)
util.Log().Warning("Failed to insert download record: %s", err)
return 0, err
}
return task.ID, nil
@ -69,7 +70,7 @@ func (task *Download) Create() (uint, error) {
// Save 更新
func (task *Download) Save() error {
if err := DB.Save(task).Error; err != nil {
util.Log().Warning("无法更新离线下载记录, %s", err)
util.Log().Warning("Failed to update download record: %s", err)
return err
}
return nil

@ -4,6 +4,7 @@ import (
"encoding/gob"
"encoding/json"
"errors"
"fmt"
"path"
"time"
@ -43,7 +44,7 @@ func (file *File) Create() error {
tx := DB.Begin()
if err := tx.Create(file).Error; err != nil {
util.Log().Warning("无法插入文件记录, %s", err)
util.Log().Warning("Failed to insert file record: %s", err)
tx.Rollback()
return err
}
@ -186,15 +187,20 @@ func RemoveFilesWithSoftLinks(files []File) ([]File, error) {
// 结果值
filteredFiles := make([]File, 0)
if len(files) == 0 {
return filteredFiles, nil
}
// 查询软链接的文件
var filesWithSoftLinks []File
tx := DB
for _, value := range files {
tx = tx.Or("source_name = ? and policy_id = ? and id != ?", value.SourceName, value.PolicyID, value.ID)
filesWithSoftLinks := make([]File, 0)
for _, file := range files {
var softLinkFile File
res := DB.
Where("source_name = ? and policy_id = ? and id != ?", file.SourceName, file.PolicyID, file.ID).
First(&softLinkFile)
if res.Error == nil {
filesWithSoftLinks = append(filesWithSoftLinks, softLinkFile)
}
result := tx.Find(&filesWithSoftLinks)
if result.Error != nil {
return nil, result.Error
}
// 过滤具有软连接的文件
@ -334,6 +340,25 @@ func (file *File) CanCopy() bool {
return file.UploadSessionID == nil
}
// CreateOrGetSourceLink creates a SourceLink model. If the given model exists, the existing
// model will be returned.
func (file *File) CreateOrGetSourceLink() (*SourceLink, error) {
res := &SourceLink{}
err := DB.Set("gorm:auto_preload", true).Where("file_id = ?", file.ID).Find(&res).Error
if err == nil && res.ID > 0 {
return res, nil
}
res.FileID = file.ID
res.Name = file.Name
if err := DB.Save(res).Error; err != nil {
return nil, fmt.Errorf("failed to insert SourceLink: %w", err)
}
res.File = *file
return res, nil
}
/*
webdav.FileInfo
*/

@ -257,6 +257,19 @@ func TestFile_GetPolicy(t *testing.T) {
}
}
func TestRemoveFilesWithSoftLinks_EmptyArg(t *testing.T) {
asserts := assert.New(t)
// 传入空
{
mock.ExpectQuery("SELECT(.+)files(.+)")
file, err := RemoveFilesWithSoftLinks([]File{})
asserts.Error(mock.ExpectationsWereMet())
asserts.NoError(err)
asserts.Equal(len(file), 0)
DB.Find(&File{})
}
}
func TestRemoveFilesWithSoftLinks(t *testing.T) {
asserts := assert.New(t)
files := []File{
@ -272,30 +285,34 @@ func TestRemoveFilesWithSoftLinks(t *testing.T) {
},
}
// 传入空文件列表
{
file, err := RemoveFilesWithSoftLinks([]File{})
asserts.NoError(err)
asserts.Empty(file)
}
// 全都没有
{
mock.ExpectQuery("SELECT(.+)files(.+)").
WithArgs("1.txt", 23, 1, "2.txt", 24, 2).
WithArgs("1.txt", 23, 1).
WillReturnRows(sqlmock.NewRows([]string{"id", "policy_id", "source_name"}))
mock.ExpectQuery("SELECT(.+)files(.+)").
WithArgs("2.txt", 24, 2).
WillReturnRows(sqlmock.NewRows([]string{"id", "policy_id", "source_name"}))
file, err := RemoveFilesWithSoftLinks(files)
asserts.NoError(mock.ExpectationsWereMet())
asserts.NoError(err)
asserts.Equal(files, file)
}
// 查询出错
{
mock.ExpectQuery("SELECT(.+)files(.+)").
WithArgs("1.txt", 23, 1, "2.txt", 24, 2).
WillReturnError(errors.New("error"))
file, err := RemoveFilesWithSoftLinks(files)
asserts.NoError(mock.ExpectationsWereMet())
asserts.Error(err)
asserts.Nil(file)
}
// 第二个是软链
{
mock.ExpectQuery("SELECT(.+)files(.+)").
WithArgs("1.txt", 23, 1, "2.txt", 24, 2).
WithArgs("1.txt", 23, 1).
WillReturnRows(sqlmock.NewRows([]string{"id", "policy_id", "source_name"}))
mock.ExpectQuery("SELECT(.+)files(.+)").
WithArgs("2.txt", 24, 2).
WillReturnRows(
sqlmock.NewRows([]string{"id", "policy_id", "source_name"}).
AddRow(3, 24, "2.txt"),
@ -305,14 +322,18 @@ func TestRemoveFilesWithSoftLinks(t *testing.T) {
asserts.NoError(err)
asserts.Equal(files[:1], file)
}
// 第一个是软链
{
mock.ExpectQuery("SELECT(.+)files(.+)").
WithArgs("1.txt", 23, 1, "2.txt", 24, 2).
WithArgs("1.txt", 23, 1).
WillReturnRows(
sqlmock.NewRows([]string{"id", "policy_id", "source_name"}).
AddRow(3, 23, "1.txt"),
)
mock.ExpectQuery("SELECT(.+)files(.+)").
WithArgs("2.txt", 24, 2).
WillReturnRows(sqlmock.NewRows([]string{"id", "policy_id", "source_name"}))
file, err := RemoveFilesWithSoftLinks(files)
asserts.NoError(mock.ExpectationsWereMet())
asserts.NoError(err)
@ -321,11 +342,16 @@ func TestRemoveFilesWithSoftLinks(t *testing.T) {
// 全部是软链
{
mock.ExpectQuery("SELECT(.+)files(.+)").
WithArgs("1.txt", 23, 1, "2.txt", 24, 2).
WithArgs("1.txt", 23, 1).
WillReturnRows(
sqlmock.NewRows([]string{"id", "policy_id", "source_name"}).
AddRow(3, 24, "2.txt").
AddRow(4, 23, "1.txt"),
AddRow(3, 23, "1.txt"),
)
mock.ExpectQuery("SELECT(.+)files(.+)").
WithArgs("2.txt", 24, 2).
WillReturnRows(
sqlmock.NewRows([]string{"id", "policy_id", "source_name"}).
AddRow(3, 24, "2.txt"),
)
file, err := RemoveFilesWithSoftLinks(files)
asserts.NoError(mock.ExpectationsWereMet())
@ -585,3 +611,44 @@ func TestGetFilesByKeywords(t *testing.T) {
asserts.Len(res, 1)
}
}
func TestFile_CreateOrGetSourceLink(t *testing.T) {
a := assert.New(t)
file := &File{}
file.ID = 1
// 已存在,返回老的 SourceLink
{
mock.ExpectQuery("SELECT(.+)source_links(.+)").WithArgs(1).WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(2))
res, err := file.CreateOrGetSourceLink()
a.NoError(err)
a.EqualValues(2, res.ID)
a.NoError(mock.ExpectationsWereMet())
}
// 不存在,插入失败
{
expectedErr := errors.New("error")
mock.ExpectQuery("SELECT(.+)source_links(.+)").WithArgs(1).WillReturnRows(sqlmock.NewRows([]string{"id"}))
mock.ExpectBegin()
mock.ExpectExec("INSERT(.+)source_links(.+)").WillReturnError(expectedErr)
mock.ExpectRollback()
res, err := file.CreateOrGetSourceLink()
a.Nil(res)
a.ErrorIs(err, expectedErr)
a.NoError(mock.ExpectationsWereMet())
}
// 成功
{
mock.ExpectQuery("SELECT(.+)source_links(.+)").WithArgs(1).WillReturnRows(sqlmock.NewRows([]string{"id"}))
mock.ExpectBegin()
mock.ExpectExec("INSERT(.+)source_links(.+)").WillReturnResult(sqlmock.NewResult(2, 1))
mock.ExpectCommit()
res, err := file.CreateOrGetSourceLink()
a.NoError(err)
a.EqualValues(2, res.ID)
a.EqualValues(file.ID, res.File.ID)
a.NoError(mock.ExpectationsWereMet())
}
}

@ -161,7 +161,7 @@ func (folder *Folder) MoveOrCopyFileTo(files []uint, dstFolder *Folder, isCopy b
// 复制文件记录
for _, oldFile := range originFiles {
if !oldFile.CanCopy() {
util.Log().Warning("无法复制正在上传中的文件 [%s] 跳过...", oldFile.Name)
util.Log().Warning("Cannot copy file %q because it's being uploaded now, skipping...", oldFile.Name)
continue
}
@ -224,8 +224,8 @@ func (folder *Folder) CopyFolderTo(folderID uint, dstFolder *Folder) (size uint6
} else if IDCache, ok := newIDCache[*folder.ParentID]; ok {
newID = IDCache
} else {
util.Log().Warning("无法取得新的父目录:%d", folder.ParentID)
return size, errors.New("无法取得新的父目录")
util.Log().Warning("Failed to get parent folder %q", *folder.ParentID)
return size, errors.New("Failed to get parent folder")
}
// 插入新的目录记录
@ -254,7 +254,7 @@ func (folder *Folder) CopyFolderTo(folderID uint, dstFolder *Folder) (size uint6
// 复制文件记录
for _, oldFile := range originFiles {
if !oldFile.CanCopy() {
util.Log().Warning("无法复制正在上传中的文件 [%s] 跳过...", oldFile.Name)
util.Log().Warning("Cannot copy file %q because it's being uploaded now, skipping...", oldFile.Name)
continue
}

@ -32,6 +32,7 @@ type GroupOption struct {
Aria2 bool `json:"aria2,omitempty"` // 离线下载
Aria2Options map[string]interface{} `json:"aria2_options,omitempty"` // 离线下载用户组配置
SourceBatchSize int `json:"source_batch,omitempty"`
RedirectedSource bool `json:"redirected_source,omitempty"`
Aria2BatchSize int `json:"aria2_batch,omitempty"`
}
@ -66,7 +67,7 @@ func (group *Group) BeforeSave() (err error) {
return err
}
//SerializePolicyList 将序列后的可选策略列表、配置写入数据库字段
// SerializePolicyList 将序列后的可选策略列表、配置写入数据库字段
// TODO 完善测试
func (group *Group) SerializePolicyList() (err error) {
policies, err := json.Marshal(&group.PolicyList)

@ -20,7 +20,7 @@ var DB *gorm.DB
// Init 初始化 MySQL 链接
func Init() {
util.Log().Info("初始化数据库连接")
util.Log().Info("Initializing database connection...")
var (
db *gorm.DB
@ -51,13 +51,13 @@ func Init() {
conf.DatabaseConfig.Name,
conf.DatabaseConfig.Charset))
default:
util.Log().Panic("不支持数据库类型: %s", conf.DatabaseConfig.Type)
util.Log().Panic("Unsupported database type %q.", conf.DatabaseConfig.Type)
}
}
//db.SetLogger(util.Log())
if err != nil {
util.Log().Panic("连接数据库不成功, %s", err)
util.Log().Panic("Failed to connect to database: %s", err)
}
// 处理表前缀

@ -19,16 +19,16 @@ func needMigration() bool {
return DB.Where("name = ?", "db_version_"+conf.RequiredDBVersion).First(&setting).Error != nil
}
//执行数据迁移
// 执行数据迁移
func migration() {
// 确认是否需要执行迁移
if !needMigration() {
util.Log().Info("数据库版本匹配,跳过数据库迁移")
util.Log().Info("Database version fulfilled, skip schema migration.")
return
}
util.Log().Info("开始进行数据库初始化...")
util.Log().Info("Start initializing database schema...")
// 清除所有缓存
if instance, ok := cache.Store.(*cache.RedisStore); ok {
@ -41,7 +41,7 @@ func migration() {
}
DB.AutoMigrate(&User{}, &Setting{}, &Group{}, &Policy{}, &Folder{}, &File{}, &Share{},
&Task{}, &Download{}, &Tag{}, &Webdav{}, &Node{})
&Task{}, &Download{}, &Tag{}, &Webdav{}, &Node{}, &SourceLink{})
// 创建初始存储策略
addDefaultPolicy()
@ -61,7 +61,7 @@ func migration() {
// 执行数据库升级脚本
execUpgradeScripts()
util.Log().Info("数据库初始化结束")
util.Log().Info("Finish initializing database schema.")
}
@ -70,7 +70,7 @@ func addDefaultPolicy() {
// 未找到初始存储策略时,则创建
if gorm.IsRecordNotFoundError(err) {
defaultPolicy := Policy{
Name: "默认存储策略",
Name: "Default storage policy",
Type: "local",
MaxSize: 0,
AutoRename: true,
@ -82,7 +82,7 @@ func addDefaultPolicy() {
},
}
if err := DB.Create(&defaultPolicy).Error; err != nil {
util.Log().Panic("无法创建初始存储策略, %s", err)
util.Log().Panic("Failed to create default storage policy: %s", err)
}
}
}
@ -98,7 +98,7 @@ func addDefaultGroups() {
// 未找到初始管理组时,则创建
if gorm.IsRecordNotFoundError(err) {
defaultAdminGroup := Group{
Name: "管理员",
Name: "Admin",
PolicyList: []uint{1},
MaxStorage: 1 * 1024 * 1024 * 1024,
ShareEnabled: true,
@ -110,10 +110,11 @@ func addDefaultGroups() {
Aria2: true,
SourceBatchSize: 1000,
Aria2BatchSize: 50,
RedirectedSource: true,
},
}
if err := DB.Create(&defaultAdminGroup).Error; err != nil {
util.Log().Panic("无法创建管理用户组, %s", err)
util.Log().Panic("Failed to create admin user group: %s", err)
}
}
@ -122,7 +123,7 @@ func addDefaultGroups() {
// 未找到初始注册会员时,则创建
if gorm.IsRecordNotFoundError(err) {
defaultAdminGroup := Group{
Name: "注册会员",
Name: "User",
PolicyList: []uint{1},
MaxStorage: 1 * 1024 * 1024 * 1024,
ShareEnabled: true,
@ -131,10 +132,11 @@ func addDefaultGroups() {
ShareDownload: true,
SourceBatchSize: 10,
Aria2BatchSize: 1,
RedirectedSource: true,
},
}
if err := DB.Create(&defaultAdminGroup).Error; err != nil {
util.Log().Panic("无法创建初始注册会员用户组, %s", err)
util.Log().Panic("Failed to create initial user group: %s", err)
}
}
@ -143,7 +145,7 @@ func addDefaultGroups() {
// 未找到初始游客用户组时,则创建
if gorm.IsRecordNotFoundError(err) {
defaultAdminGroup := Group{
Name: "游客",
Name: "Anonymous",
PolicyList: []uint{},
Policies: "[]",
OptionsSerialized: GroupOption{
@ -151,7 +153,7 @@ func addDefaultGroups() {
},
}
if err := DB.Create(&defaultAdminGroup).Error; err != nil {
util.Log().Panic("无法创建初始游客用户组, %s", err)
util.Log().Panic("Failed to create anonymous user group: %s", err)
}
}
}
@ -169,15 +171,15 @@ func addDefaultUser() {
defaultUser.GroupID = 1
err := defaultUser.SetPassword(password)
if err != nil {
util.Log().Panic("无法创建密码, %s", err)
util.Log().Panic("Failed to create password: %s", err)
}
if err := DB.Create(&defaultUser).Error; err != nil {
util.Log().Panic("无法创建初始用户, %s", err)
util.Log().Panic("Failed to create initial root user: %s", err)
}
c := color.New(color.FgWhite).Add(color.BgBlack).Add(color.Bold)
util.Log().Info("初始管理员账号:" + c.Sprint("admin@cloudreve.org"))
util.Log().Info("初始管理员密码:" + c.Sprint(password))
util.Log().Info("Admin user name: " + c.Sprint("admin@cloudreve.org"))
util.Log().Info("Admin password: " + c.Sprint(password))
}
}
@ -186,7 +188,7 @@ func addDefaultNode() {
if gorm.IsRecordNotFoundError(err) {
defaultAdminGroup := Node{
Name: "主机(本机)",
Name: "Master (Local machine)",
Status: NodeActive,
Type: MasterNodeType,
Aria2OptionsSerialized: Aria2Option{
@ -195,7 +197,7 @@ func addDefaultNode() {
},
}
if err := DB.Create(&defaultAdminGroup).Error; err != nil {
util.Log().Panic("无法创建初始节点记录, %s", err)
util.Log().Panic("Failed to create initial node: %s", err)
}
}
}

@ -36,7 +36,7 @@ type Share struct {
// Create 创建分享
func (share *Share) Create() (uint, error) {
if err := DB.Create(share).Error; err != nil {
util.Log().Warning("无法插入数据库记录, %s", err)
util.Log().Warning("Failed to insert share record: %s", err)
return 0, err
}
return share.ID, nil
@ -131,9 +131,9 @@ func (share *Share) CanBeDownloadBy(user *User) error {
// 用户组权限
if !user.Group.OptionsSerialized.ShareDownload {
if user.IsAnonymous() {
return errors.New("未登录用户无法下载")
return errors.New("you must login to download")
}
return errors.New("您当前的用户组无权下载")
return errors.New("your group has no permission to download")
}
return nil
}

@ -0,0 +1,47 @@
package model
import (
"fmt"
"github.com/cloudreve/Cloudreve/v3/pkg/hashid"
"github.com/jinzhu/gorm"
"net/url"
)
// SourceLink represent a shared file source link
type SourceLink struct {
gorm.Model
FileID uint // corresponding file ID
Name string // name of the file while creating the source link, for annotation
Downloads int // 下载数
// 关联模型
File File `gorm:"save_associations:false:false"`
}
// Link gets the URL of a SourceLink
func (s *SourceLink) Link() (string, error) {
baseURL := GetSiteURL()
linkPath, err := url.Parse(fmt.Sprintf("/f/%s/%s", hashid.HashID(s.ID, hashid.SourceLinkID), s.File.Name))
if err != nil {
return "", err
}
return baseURL.ResolveReference(linkPath).String(), nil
}
// GetTasksByID queries source link based on ID
func GetSourceLinkByID(id interface{}) (*SourceLink, error) {
link := &SourceLink{}
result := DB.Where("id = ?", id).First(link)
files, _ := GetFilesByIDs([]uint{link.FileID}, 0)
if len(files) > 0 {
link.File = files[0]
}
return link, result.Error
}
// Viewed 增加访问次数
func (s *SourceLink) Downloaded() {
s.Downloads++
DB.Model(s).UpdateColumn("downloads", gorm.Expr("downloads + ?", 1))
}

@ -0,0 +1,52 @@
package model
import (
"github.com/DATA-DOG/go-sqlmock"
"github.com/stretchr/testify/assert"
"testing"
)
func TestSourceLink_Link(t *testing.T) {
a := assert.New(t)
s := &SourceLink{}
s.ID = 1
// 失败
{
s.File.Name = string([]byte{0x7f})
res, err := s.Link()
a.Error(err)
a.Empty(res)
}
// 成功
{
s.File.Name = "filename"
res, err := s.Link()
a.NoError(err)
a.Contains(res, s.Name)
}
}
func TestGetSourceLinkByID(t *testing.T) {
a := assert.New(t)
mock.ExpectQuery("SELECT(.+)source_links(.+)").WithArgs(1).WillReturnRows(sqlmock.NewRows([]string{"id", "file_id"}).AddRow(1, 2))
mock.ExpectQuery("SELECT(.+)files(.+)").WithArgs(2).WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(2))
res, err := GetSourceLinkByID(1)
a.NoError(err)
a.NotNil(res)
a.EqualValues(2, res.File.ID)
a.NoError(mock.ExpectationsWereMet())
}
func TestSourceLink_Downloaded(t *testing.T) {
a := assert.New(t)
s := &SourceLink{}
s.ID = 1
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)source_links(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
s.Downloaded()
a.NoError(mock.ExpectationsWereMet())
}

@ -26,7 +26,7 @@ const (
// Create 创建标签记录
func (tag *Tag) Create() (uint, error) {
if err := DB.Create(tag).Error; err != nil {
util.Log().Warning("无法插入离线下载记录, %s", err)
util.Log().Warning("Failed to insert tag record: %s", err)
return 0, err
}
return tag.ID, nil

@ -19,7 +19,7 @@ type Task struct {
// Create 创建任务记录
func (task *Task) Create() (uint, error) {
if err := DB.Create(task).Error; err != nil {
util.Log().Warning("无法插入任务记录, %s", err)
util.Log().Warning("Failed to insert task record: %s", err)
return 0, err
}
return task.ID, nil

@ -3,8 +3,6 @@ package aria2
import (
"context"
"fmt"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
"net/url"
"sync"
"time"
@ -14,6 +12,8 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/monitor"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
"github.com/cloudreve/Cloudreve/v3/pkg/balancer"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
)
// Instance 默认使用的Aria2处理实例
@ -40,7 +40,7 @@ func Init(isReload bool, pool cluster.Pool, mqClient mq.MQ) {
if !isReload {
// 从数据库中读取未完成任务,创建监控
unfinished := model.GetDownloadsByStatus(common.Ready, common.Paused, common.Downloading)
unfinished := model.GetDownloadsByStatus(common.Ready, common.Paused, common.Downloading, common.Seeding)
for i := 0; i < len(unfinished); i++ {
// 创建任务监控

@ -46,13 +46,15 @@ const (
Canceled
// Unknown 未知状态
Unknown
// Seeding 做种中
Seeding
)
var (
// ErrNotEnabled 功能未开启错误
ErrNotEnabled = serializer.NewError(serializer.CodeNoPermissionErr, "离线下载功能未开启", nil)
ErrNotEnabled = serializer.NewError(serializer.CodeFeatureNotEnabled, "not enabled", nil)
// ErrUserNotFound 未找到下载任务创建者
ErrUserNotFound = serializer.NewError(serializer.CodeNotFound, "无法找到任务创建者", nil)
ErrUserNotFound = serializer.NewError(serializer.CodeUserNotFound, "", nil)
)
// DummyAria2 未开启Aria2功能时使用的默认处理器
@ -94,11 +96,14 @@ func (instance *DummyAria2) DeleteTempFile(src *model.Download) error {
}
// GetStatus 将给定的状态字符串转换为状态标识数字
func GetStatus(status string) int {
switch status {
func GetStatus(status rpc.StatusInfo) int {
switch status.Status {
case "complete":
return Complete
case "active":
if status.BitTorrent.Mode != "" && status.CompletedLength == status.TotalLength {
return Seeding
}
return Downloading
case "waiting":
return Ready

@ -1,9 +1,11 @@
package common
import (
"testing"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
"github.com/stretchr/testify/assert"
"testing"
)
func TestDummyAria2(t *testing.T) {
@ -35,11 +37,18 @@ func TestDummyAria2(t *testing.T) {
func TestGetStatus(t *testing.T) {
a := assert.New(t)
a.Equal(GetStatus("complete"), Complete)
a.Equal(GetStatus("active"), Downloading)
a.Equal(GetStatus("waiting"), Ready)
a.Equal(GetStatus("paused"), Paused)
a.Equal(GetStatus("error"), Error)
a.Equal(GetStatus("removed"), Canceled)
a.Equal(GetStatus("unknown"), Unknown)
a.Equal(GetStatus(rpc.StatusInfo{Status: "complete"}), Complete)
a.Equal(GetStatus(rpc.StatusInfo{Status: "active",
BitTorrent: rpc.BitTorrentInfo{Mode: ""}}), Downloading)
a.Equal(GetStatus(rpc.StatusInfo{Status: "active",
BitTorrent: rpc.BitTorrentInfo{Mode: "single"},
TotalLength: "100", CompletedLength: "50"}), Downloading)
a.Equal(GetStatus(rpc.StatusInfo{Status: "active",
BitTorrent: rpc.BitTorrentInfo{Mode: "multi"},
TotalLength: "100", CompletedLength: "100"}), Seeding)
a.Equal(GetStatus(rpc.StatusInfo{Status: "waiting"}), Ready)
a.Equal(GetStatus(rpc.StatusInfo{Status: "paused"}), Paused)
a.Equal(GetStatus(rpc.StatusInfo{Status: "error"}), Error)
a.Equal(GetStatus(rpc.StatusInfo{Status: "removed"}), Canceled)
a.Equal(GetStatus(rpc.StatusInfo{Status: "unknown"}), Unknown)
}

@ -45,7 +45,7 @@ func NewMonitor(task *model.Download, pool cluster.Pool, mqClient mq.MQ) {
monitor.notifier = mqClient.Subscribe(monitor.Task.GID, 0)
} else {
monitor.setErrorStatus(errors.New("节点不可用"))
monitor.setErrorStatus(errors.New("node not avaliable"))
}
}
@ -77,11 +77,12 @@ func (monitor *Monitor) Update() bool {
if err != nil {
monitor.retried++
util.Log().Warning("无法获取下载任务[%s]的状态,%s", monitor.Task.GID, err)
util.Log().Warning("Cannot get status of download task %q: %s", monitor.Task.GID, err)
// 十次重试后认定为任务失败
if monitor.retried > MAX_RETRY {
util.Log().Warning("无法获取下载任务[%s]的状态,超过最大重试次数限制,%s", monitor.Task.GID, err)
util.Log().Warning("Cannot get status of download task %qexceed maximum retry threshold: %s",
monitor.Task.GID, err)
monitor.setErrorStatus(err)
monitor.RemoveTempFolder()
return true
@ -93,7 +94,7 @@ func (monitor *Monitor) Update() bool {
// 磁力链下载需要跟随
if len(status.FollowedBy) > 0 {
util.Log().Debug("离线下载[%s]重定向至[%s]", monitor.Task.GID, status.FollowedBy[0])
util.Log().Debug("Redirected download task from %q to %q.", monitor.Task.GID, status.FollowedBy[0])
monitor.Task.GID = status.FollowedBy[0]
monitor.Task.Save()
return false
@ -101,28 +102,28 @@ func (monitor *Monitor) Update() bool {
// 更新任务信息
if err := monitor.UpdateTaskInfo(status); err != nil {
util.Log().Warning("无法更新下载任务[%s]的任务信息[%s]", monitor.Task.GID, err)
util.Log().Warning("Failed to update status of download task %q: %s", monitor.Task.GID, err)
monitor.setErrorStatus(err)
monitor.RemoveTempFolder()
return true
}
util.Log().Debug("离线下载[%s]更新状态[%s]", status.Gid, status.Status)
util.Log().Debug("Remote download %q status updated to %q.", status.Gid, status.Status)
switch status.Status {
case "complete":
switch common.GetStatus(status) {
case common.Complete, common.Seeding:
return monitor.Complete(task.TaskPoll)
case "error":
case common.Error:
return monitor.Error(status)
case "active", "waiting", "paused":
case common.Downloading, common.Ready, common.Paused:
return false
case "removed":
case common.Canceled:
monitor.Task.Status = common.Canceled
monitor.Task.Save()
monitor.RemoveTempFolder()
return true
default:
util.Log().Warning("下载任务[%s]返回未知状态信息[%s]", monitor.Task.GID, status.Status)
util.Log().Warning("Download task %q returns unknown status %q.", monitor.Task.GID, status.Status)
return true
}
}
@ -132,7 +133,7 @@ func (monitor *Monitor) UpdateTaskInfo(status rpc.StatusInfo) error {
originSize := monitor.Task.TotalSize
monitor.Task.GID = status.Gid
monitor.Task.Status = common.GetStatus(status.Status)
monitor.Task.Status = common.GetStatus(status)
// 文件大小、已下载大小
total, err := strconv.ParseUint(status.TotalLength, 10, 64)
@ -235,6 +236,40 @@ func (monitor *Monitor) RemoveTempFolder() {
// Complete 完成下载,返回是否中断监控
func (monitor *Monitor) Complete(pool task.Pool) bool {
// 未开始转存,提交转存任务
if monitor.Task.TaskID == 0 {
return monitor.transfer(pool)
}
// 做种完成
if common.GetStatus(monitor.Task.StatusInfo) == common.Complete {
transferTask, err := model.GetTasksByID(monitor.Task.TaskID)
if err != nil {
monitor.setErrorStatus(err)
monitor.RemoveTempFolder()
return true
}
// 转存完成,回收下载目录
if transferTask.Type == task.TransferTaskType && transferTask.Status >= task.Error {
job, err := task.NewRecycleTask(monitor.Task)
if err != nil {
monitor.setErrorStatus(err)
monitor.RemoveTempFolder()
return true
}
// 提交回收任务
pool.Submit(job)
return true
}
}
return false
}
func (monitor *Monitor) transfer(pool task.Pool) bool {
// 创建中转任务
file := make([]string, 0, len(monitor.Task.StatusInfo.Files))
sizes := make(map[string]uint64, len(monitor.Task.StatusInfo.Files))
@ -269,7 +304,7 @@ func (monitor *Monitor) Complete(pool task.Pool) bool {
monitor.Task.TaskID = job.Model().ID
monitor.Task.Save()
return true
return false
}
func (monitor *Monitor) setErrorStatus(err error) {

@ -3,6 +3,8 @@ package monitor
import (
"database/sql"
"errors"
"testing"
"github.com/DATA-DOG/go-sqlmock"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
@ -13,7 +15,6 @@ import (
"github.com/jinzhu/gorm"
"github.com/stretchr/testify/assert"
testMock "github.com/stretchr/testify/mock"
"testing"
)
var mock sqlmock.Sqlmock
@ -431,6 +432,14 @@ func TestMonitor_Complete(t *testing.T) {
mock.ExpectExec("UPDATE(.+)downloads").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
mock.ExpectQuery("SELECT(.+)tasks").WillReturnRows(sqlmock.NewRows([]string{"id", "type", "status"}).AddRow(1, 2, 4))
mock.ExpectQuery("SELECT(.+)users").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(9414))
mock.ExpectBegin()
mock.ExpectExec("INSERT(.+)tasks").WillReturnResult(sqlmock.NewResult(2, 1))
mock.ExpectCommit()
a.False(m.Complete(mockPool))
m.Task.StatusInfo.Status = "complete"
a.True(m.Complete(mockPool))
a.NoError(mock.ExpectationsWereMet())
mockNode.AssertExpectations(t)

@ -24,15 +24,7 @@ type StatusInfo struct {
BelongsTo string `json:"belongsTo"` // GID of a parent download. Some downloads are a part of another download. For example, if a file in a Metalink has BitTorrent resources, the downloads of ".torrent" files are parts of that parent. If this download has no parent, this key will not be included in the response.
Dir string `json:"dir"` // Directory to save files.
Files []FileInfo `json:"files"` // Returns the list of files. The elements of this list are the same structs used in aria2.getFiles() method.
BitTorrent struct {
AnnounceList [][]string `json:"announceList"` // List of lists of announce URIs. If the torrent contains announce and no announce-list, announce is converted to the announce-list format.
Comment string `json:"comment"` // The comment of the torrent. comment.utf-8 is used if available.
CreationDate int64 `json:"creationDate"` // The creation time of the torrent. The value is an integer since the epoch, measured in seconds.
Mode string `json:"mode"` // File mode of the torrent. The value is either single or multi.
Info struct {
Name string `json:"name"` // name in info dictionary. name.utf-8 is used if available.
} `json:"info"` // Struct which contains data from Info dictionary. It contains following keys.
} `json:"bittorrent"` // Struct which contains information retrieved from the .torrent (file). BitTorrent only. It contains following keys.
BitTorrent BitTorrentInfo `json:"bittorrent"` // Struct which contains information retrieved from the .torrent (file). BitTorrent only. It contains following keys.
}
// URIInfo represents an element of response of aria2.getUris
@ -100,3 +92,13 @@ type Method struct {
Name string `json:"methodName"` // Method name to call
Params []interface{} `json:"params"` // Array containing parameters to the method call
}
type BitTorrentInfo struct {
AnnounceList [][]string `json:"announceList"` // List of lists of announce URIs. If the torrent contains announce and no announce-list, announce is converted to the announce-list format.
Comment string `json:"comment"` // The comment of the torrent. comment.utf-8 is used if available.
CreationDate int64 `json:"creationDate"` // The creation time of the torrent. The value is an integer since the epoch, measured in seconds.
Mode string `json:"mode"` // File mode of the torrent. The value is either single or multi.
Info struct {
Name string `json:"name"` // name in info dictionary. name.utf-8 is used if available.
} `json:"info"` // Struct which contains data from Info dictionary. It contains following keys.
}

@ -17,10 +17,10 @@ import (
)
var (
ErrAuthFailed = serializer.NewError(serializer.CodeNoPermissionErr, "鉴权失败", nil)
ErrAuthFailed = serializer.NewError(serializer.CodeInvalidSign, "invalid sign", nil)
ErrAuthHeaderMissing = serializer.NewError(serializer.CodeNoPermissionErr, "authorization header is missing", nil)
ErrExpiresMissing = serializer.NewError(serializer.CodeNoPermissionErr, "expire timestamp is missing", nil)
ErrExpired = serializer.NewError(serializer.CodeSignExpired, "签名已过期", nil)
ErrExpired = serializer.NewError(serializer.CodeSignExpired, "signature expired", nil)
)
const CrHeaderPrefix = "X-Cr-"
@ -136,7 +136,7 @@ func Init() {
} else {
secretKey = conf.SlaveConfig.Secret
if secretKey == "" {
util.Log().Panic("未指定 SlaveSecret请前往配置文件中指定")
util.Log().Panic("SlaveSecret is not set, please specify it in config file.")
}
}
General = HMACAuth{

@ -24,7 +24,7 @@ func Init(isSlave bool) {
if isSlave {
err := Store.Sets(conf.OptionOverwrite, "setting_")
if err != nil {
util.Log().Warning("无法覆盖数据库设置: %s", err)
util.Log().Warning("Failed to overwrite database setting: %s", err)
}
}
}

2
pkg/cache/memo.go vendored

@ -53,7 +53,7 @@ func (store *MemoStore) GarbageCollect() {
store.Store.Range(func(key, value interface{}) bool {
if item, ok := value.(itemWithTTL); ok {
if item.expires > 0 && item.expires < time.Now().Unix() {
util.Log().Debug("回收垃圾[%s]", key.(string))
util.Log().Debug("Cache %q is garbage collected.", key.(string))
store.Store.Delete(key)
}
}

@ -60,7 +60,7 @@ func NewRedisStore(size int, network, address, password string, database int) *R
redis.DialPassword(password),
)
if err != nil {
util.Log().Warning("无法创建Redis连接%s", err)
util.Log().Warning("Failed to create Redis connection: %s", err)
return nil, err
}
return c, nil

@ -8,5 +8,5 @@ import (
var (
ErrFeatureNotExist = errors.New("No nodes in nodepool match the feature specificed")
ErrIlegalPath = errors.New("path out of boundary of setting temp folder")
ErrMasterNotFound = serializer.NewError(serializer.CodeMasterNotFound, "未知的主机节点", nil)
ErrMasterNotFound = serializer.NewError(serializer.CodeMasterNotFound, "Unknown master node id", nil)
)

@ -161,7 +161,7 @@ func (r *rpcService) Init() error {
// 解析RPC服务地址
server, err := url.Parse(r.parent.Model.Aria2OptionsSerialized.Server)
if err != nil {
util.Log().Warning("无法解析主机 Aria2 RPC 服务地址,%s", err)
util.Log().Warning("Failed to parse Aria2 RPC server URL: %s", err)
return err
}
server.Path = "/jsonrpc"
@ -171,7 +171,7 @@ func (r *rpcService) Init() error {
if r.parent.Model.Aria2OptionsSerialized.Options != "" {
err = json.Unmarshal([]byte(r.parent.Model.Aria2OptionsSerialized.Options), &globalOptions)
if err != nil {
util.Log().Warning("无法解析主机 Aria2 配置,%s", err)
util.Log().Warning("Failed to parse aria2 options: %s", err)
return err
}
}
@ -221,7 +221,7 @@ func (r *rpcService) Status(task *model.Download) (rpc.StatusInfo, error) {
res, err := r.Caller.TellStatus(task.GID)
if err != nil {
// 失败后重试
util.Log().Debug("无法获取离线下载状态,%s稍后重试", err)
util.Log().Debug("Failed to get download task status, please retry later: %s", err)
time.Sleep(r.retryDuration)
res, err = r.Caller.TellStatus(task.GID)
}
@ -233,7 +233,7 @@ func (r *rpcService) Cancel(task *model.Download) error {
// 取消下载任务
_, err := r.Caller.Remove(task.GID)
if err != nil {
util.Log().Warning("无法取消离线下载任务[%s], %s", task.GID, err)
util.Log().Warning("Failed to cancel task %q: %s", task.GID, err)
}
return err
@ -264,7 +264,7 @@ func (s *rpcService) DeleteTempFile(task *model.Download) error {
time.Sleep(d)
err := os.RemoveAll(src)
if err != nil {
util.Log().Warning("无法删除离线下载临时目录[%s], %s", src, err)
util.Log().Warning("Failed to delete temp download folder: %q: %s", src, err)
}
}(s.deletePaddingDuration, task.Parent)

@ -42,7 +42,7 @@ func Init() {
Default = &NodePool{}
Default.Init()
if err := Default.initFromDB(); err != nil {
util.Log().Warning("节点池初始化失败, %s", err)
util.Log().Warning("Failed to initialize node pool: %s", err)
}
}
@ -83,7 +83,7 @@ func (pool *NodePool) GetNodeByID(id uint) Node {
}
func (pool *NodePool) nodeStatusChange(isActive bool, id uint) {
util.Log().Debug("从机节点 [ID=%d] 状态变更 [Active=%t]", id, isActive)
util.Log().Debug("Slave node [ID=%d] status changed to [Active=%t].", id, isActive)
var node Node
pool.lock.Lock()
if n, ok := pool.inactive[id]; ok {

@ -172,7 +172,7 @@ func (node *SlaveNode) StartPingLoop() {
recoverDuration := time.Duration(model.GetIntSetting("slave_recover_interval", 600)) * time.Second
pingTicker := time.Duration(0)
util.Log().Debug("从机节点 [%s] 启动心跳循环", node.Model.Name)
util.Log().Debug("Slave node %q heartbeat loop started.", node.Model.Name)
retry := 0
recoverMode := false
isFirstLoop := true
@ -185,39 +185,39 @@ loop:
pingTicker = tickDuration
}
util.Log().Debug("从机节点 [%s] 发送Ping", node.Model.Name)
util.Log().Debug("Slave node %q send ping.", node.Model.Name)
res, err := node.Ping(node.getHeartbeatContent(isFirstLoop))
isFirstLoop = false
if err != nil {
util.Log().Debug("Ping从机节点 [%s] 时发生错误: %s", node.Model.Name, err)
util.Log().Debug("Error while ping slave node %q: %s", node.Model.Name, err)
retry++
if retry >= model.GetIntSetting("slave_node_retry", 3) {
util.Log().Debug("从机节点 [%s] Ping 重试已达到最大限制,将从机节点标记为不可用", node.Model.Name)
util.Log().Debug("Retry threshold for pinging slave node %q exceeded, mark it as offline.", node.Model.Name)
node.changeStatus(false)
if !recoverMode {
// 启动恢复监控循环
util.Log().Debug("从机节点 [%s] 进入恢复模式", node.Model.Name)
util.Log().Debug("Slave node %q entered recovery mode.", node.Model.Name)
pingTicker = recoverDuration
recoverMode = true
}
}
} else {
if recoverMode {
util.Log().Debug("从机节点 [%s] 复活", node.Model.Name)
util.Log().Debug("Slave node %q recovered.", node.Model.Name)
pingTicker = tickDuration
recoverMode = false
isFirstLoop = true
}
util.Log().Debug("从机节点 [%s] 状态: %s", node.Model.Name, res)
util.Log().Debug("Status of slave node %q: %s", node.Model.Name, res)
node.changeStatus(true)
retry = 0
}
case <-node.close:
util.Log().Debug("从机节点 [%s] 收到关闭信号", node.Model.Name)
util.Log().Debug("Slave node %q received shutdown signal.", node.Model.Name)
break loop
}
}
@ -421,7 +421,7 @@ func RemoteCallback(url string, body serializer.UploadCallback) error {
Data: body,
})
if err != nil {
return serializer.NewError(serializer.CodeCallbackError, "无法编码回调正文", err)
return serializer.NewError(serializer.CodeCallbackError, "Failed to encode callback content", err)
}
resp := request.GeneralClient.Request(
@ -433,13 +433,13 @@ func RemoteCallback(url string, body serializer.UploadCallback) error {
)
if resp.Err != nil {
return serializer.NewError(serializer.CodeCallbackError, "从机无法发起回调请求", resp.Err)
return serializer.NewError(serializer.CodeCallbackError, "Slave cannot send callback request", resp.Err)
}
// 解析回调服务端响应
response, err := resp.DecodeResponse()
if err != nil {
msg := fmt.Sprintf("从机无法解析主机返回的响应 (StatusCode=%d)", resp.Response.StatusCode)
msg := fmt.Sprintf("Slave cannot parse callback response from master (StatusCode=%d).", resp.Response.StatusCode)
return serializer.NewError(serializer.CodeCallbackError, msg, err)
}

@ -86,13 +86,13 @@ func Init(path string) {
}, defaultConf)
f, err := util.CreatNestedFile(path)
if err != nil {
util.Log().Panic("无法创建配置文件, %s", err)
util.Log().Panic("Failed to create config file: %s", err)
}
// 写入配置文件
_, err = f.WriteString(confContent)
if err != nil {
util.Log().Panic("无法写入配置文件, %s", err)
util.Log().Panic("Failed to write config file: %s", err)
}
f.Close()
@ -100,7 +100,7 @@ func Init(path string) {
cfg, err = ini.Load(path)
if err != nil {
util.Log().Panic("无法解析配置文件 '%s': %s", path, err)
util.Log().Panic("Failed to parse config file %q: %s", path, err)
}
sections := map[string]interface{}{
@ -115,7 +115,7 @@ func Init(path string) {
for sectionName, sectionStruct := range sections {
err = mapSection(sectionName, sectionStruct)
if err != nil {
util.Log().Panic("配置文件 %s 分区解析失败: %s", sectionName, err)
util.Log().Panic("Failed to parse config section %q: %s", sectionName, err)
}
}

@ -22,7 +22,7 @@ func garbageCollect() {
collectCache(store)
}
util.Log().Info("定时任务 [cron_garbage_collect] 执行完毕")
util.Log().Info("Crontab job \"cron_garbage_collect\" complete.")
}
func collectArchiveFile() {
@ -36,23 +36,23 @@ func collectArchiveFile() {
if err == nil && !info.IsDir() &&
strings.HasPrefix(filepath.Base(path), "archive_") &&
time.Now().Sub(info.ModTime()).Seconds() > float64(expires) {
util.Log().Debug("删除过期打包下载临时文件 [%s]", path)
util.Log().Debug("Delete expired batch download temp file %q.", path)
// 删除符合条件的文件
if err := os.Remove(path); err != nil {
util.Log().Debug("临时文件 [%s] 删除失败 , %s", path, err)
util.Log().Debug("Failed to delete temp file %q: %s", path, err)
}
}
return nil
})
if err != nil {
util.Log().Debug("[定时任务] 无法列取临时打包目录")
util.Log().Debug("Crontab job cannot list temp batch download folder: %s", err)
}
}
func collectCache(store *cache.MemoStore) {
util.Log().Debug("清理内存缓存")
util.Log().Debug("Cleanup memory cache.")
store.GarbageCollect()
}
@ -78,22 +78,22 @@ func uploadSessionCollect() {
for uid, filesIDs := range userToFiles {
user, err := model.GetUserByID(uid)
if err != nil {
util.Log().Warning("上传会话所属用户不存在, %s", err)
util.Log().Warning("Owner of the upload session cannot be found: %s", err)
continue
}
fs, err := filesystem.NewFileSystem(&user)
if err != nil {
util.Log().Warning("无法初始化文件系统, %s", err)
util.Log().Warning("Failed to initialize filesystem: %s", err)
continue
}
if err = fs.Delete(context.Background(), []uint{}, filesIDs, false); err != nil {
util.Log().Warning("无法删除上传会话, %s", err)
util.Log().Warning("Failed to delete upload session: %s", err)
}
fs.Recycle()
}
util.Log().Info("定时任务 [cron_recycle_upload_session] 执行完毕")
util.Log().Info("Crontab job \"cron_recycle_upload_session\" complete.")
}

@ -19,7 +19,7 @@ func Reload() {
// Init 初始化定时任务
func Init() {
util.Log().Info("初始化定时任务...")
util.Log().Info("Initialize crontab jobs...")
// 读取cron日程设置
options := model.GetSettingByNames(
"cron_garbage_collect",
@ -34,12 +34,12 @@ func Init() {
case "cron_recycle_upload_session":
handler = uploadSessionCollect
default:
util.Log().Warning("未知定时任务类型 [%s],跳过", k)
util.Log().Warning("Unknown crontab job type %q, skipping...", k)
continue
}
if _, err := Cron.AddFunc(v, handler); err != nil {
util.Log().Warning("无法启动定时任务 [%s] , %s", k, err)
util.Log().Warning("Failed to start crontab job %q: %s", k, err)
}
}

@ -15,7 +15,7 @@ var Lock sync.RWMutex
// Init 初始化
func Init() {
util.Log().Debug("邮件队列初始化")
util.Log().Debug("Initializing email sending queue...")
Lock.Lock()
defer Lock.Unlock()

@ -15,9 +15,9 @@ type Driver interface {
var (
// ErrChanNotOpen 邮件队列未开启
ErrChanNotOpen = errors.New("邮件队列未开启")
ErrChanNotOpen = errors.New("email queue is not started")
// ErrNoActiveDriver 无可用邮件发送服务
ErrNoActiveDriver = errors.New("无可用邮件发送服务")
ErrNoActiveDriver = errors.New("no avaliable email provider")
)
// Send 发送邮件

@ -68,7 +68,7 @@ func (client *SMTP) Init() {
defer func() {
if err := recover(); err != nil {
client.chOpen = false
util.Log().Error("邮件发送队列出现异常, %s ,10 秒后重置", err)
util.Log().Error("Exception while sending email: %s, queue will be reset in 10 seconds.", err)
time.Sleep(time.Duration(10) * time.Second)
client.Init()
}
@ -91,7 +91,7 @@ func (client *SMTP) Init() {
select {
case m, ok := <-client.ch:
if !ok {
util.Log().Debug("邮件队列关闭")
util.Log().Debug("Email queue closing...")
client.chOpen = false
return
}
@ -102,15 +102,15 @@ func (client *SMTP) Init() {
open = true
}
if err := mail.Send(s, m); err != nil {
util.Log().Warning("邮件发送失败, %s", err)
util.Log().Warning("Failed to send email: %s", err)
} else {
util.Log().Debug("邮件已发送")
util.Log().Debug("Email sent.")
}
// 长时间没有新邮件则关闭SMTP连接
case <-time.After(time.Duration(client.Config.Keepalive) * time.Second):
if open {
if err := s.Close(); err != nil {
util.Log().Warning("无法关闭 SMTP 连接 %s", err)
util.Log().Warning("Failed to close SMTP connection: %s", err)
}
open = false
}

@ -107,7 +107,7 @@ func (fs *FileSystem) doCompress(ctx context.Context, file *model.File, folder *
fs.Policy = file.GetPolicy()
err := fs.DispatchHandler()
if err != nil {
util.Log().Warning("无法压缩文件%s%s", file.Name, err)
util.Log().Warning("Failed to compress file %q: %s", file.Name, err)
return
}
@ -117,7 +117,7 @@ func (fs *FileSystem) doCompress(ctx context.Context, file *model.File, folder *
file.SourceName,
)
if err != nil {
util.Log().Debug("Open%s%s", file.Name, err)
util.Log().Debug("Failed to open %q: %s", file.Name, err)
return
}
if closer, ok := fileToZip.(io.Closer); ok {
@ -176,7 +176,7 @@ func (fs *FileSystem) Decompress(ctx context.Context, src, dst, encoding string)
// 结束时删除临时压缩文件
if tempZipFilePath != "" {
if err := os.Remove(tempZipFilePath); err != nil {
util.Log().Warning("无法删除临时压缩文件 %s , %s", tempZipFilePath, err)
util.Log().Warning("Failed to delete temp archive file %q: %s", tempZipFilePath, err)
}
}
}()
@ -197,7 +197,7 @@ func (fs *FileSystem) Decompress(ctx context.Context, src, dst, encoding string)
zipFile, err := util.CreatNestedFile(tempZipFilePath)
if err != nil {
util.Log().Warning("无法创建临时压缩文件 %s , %s", tempZipFilePath, err)
util.Log().Warning("Failed to create temp archive file %q: %s", tempZipFilePath, err)
tempZipFilePath = ""
return err
}
@ -206,7 +206,7 @@ func (fs *FileSystem) Decompress(ctx context.Context, src, dst, encoding string)
// 下载前先判断是否是可解压的格式
format, readStream, err := archiver.Identify(fs.FileTarget[0].SourceName, fileStream)
if err != nil {
util.Log().Warning("无法识别文件格式 %s , %s", fs.FileTarget[0].SourceName, err)
util.Log().Warning("Failed to detect compressed format of file %q: %s", fs.FileTarget[0].SourceName, err)
return err
}
@ -228,7 +228,7 @@ func (fs *FileSystem) Decompress(ctx context.Context, src, dst, encoding string)
if isZip {
_, err = io.Copy(zipFile, readStream)
if err != nil {
util.Log().Warning("无法写入临时压缩文件 %s , %s", tempZipFilePath, err)
util.Log().Warning("Failed to write temp archive file %q: %s", tempZipFilePath, err)
return err
}
@ -261,7 +261,7 @@ func (fs *FileSystem) Decompress(ctx context.Context, src, dst, encoding string)
wg.Done()
}
if err := recover(); err != nil {
util.Log().Warning("上传压缩包内文件时出错")
util.Log().Warning("Error while uploading files inside of archive file.")
fmt.Println(err)
}
}()
@ -274,7 +274,7 @@ func (fs *FileSystem) Decompress(ctx context.Context, src, dst, encoding string)
}, true)
fileStream.Close()
if err != nil {
util.Log().Debug("无法上传压缩包内的文件%s , %s , 跳过", rawPath, err)
util.Log().Debug("Failed to upload file %q in archive file: %s, skipping...", rawPath, err)
}
}
@ -297,7 +297,7 @@ func (fs *FileSystem) Decompress(ctx context.Context, src, dst, encoding string)
// 上传文件
fileStream, err := f.Open()
if err != nil {
util.Log().Warning("无法打开压缩包内文件%s , %s , 跳过", rawPath, err)
util.Log().Warning("Failed to open file %q in archive file: %s, skipping...", rawPath, err)
return nil
}

@ -1,14 +1,22 @@
package backoff
import "time"
import (
"errors"
"fmt"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
"net/http"
"strconv"
"time"
)
// Backoff used for retry sleep backoff
type Backoff interface {
Next() bool
Next(err error) bool
Reset()
}
// ConstantBackoff implements Backoff interface with constant sleep time
// ConstantBackoff implements Backoff interface with constant sleep time. If the error
// is retryable and with `RetryAfter` defined, the `RetryAfter` will be used as sleep duration.
type ConstantBackoff struct {
Sleep time.Duration
Max int
@ -16,16 +24,51 @@ type ConstantBackoff struct {
tried int
}
func (c *ConstantBackoff) Next() bool {
func (c *ConstantBackoff) Next(err error) bool {
c.tried++
if c.tried > c.Max {
return false
}
var e *RetryableError
if errors.As(err, &e) && e.RetryAfter > 0 {
util.Log().Warning("Retryable error %q occurs in backoff, will sleep after %s.", e, e.RetryAfter)
time.Sleep(e.RetryAfter)
} else {
time.Sleep(c.Sleep)
}
return true
}
func (c *ConstantBackoff) Reset() {
c.tried = 0
}
type RetryableError struct {
Err error
RetryAfter time.Duration
}
// NewRetryableErrorFromHeader constructs a new RetryableError from http response header
// and existing error.
func NewRetryableErrorFromHeader(err error, header http.Header) *RetryableError {
retryAfter := header.Get("retry-after")
if retryAfter == "" {
retryAfter = "0"
}
res := &RetryableError{
Err: err,
}
if retryAfterSecond, err := strconv.ParseInt(retryAfter, 10, 64); err == nil {
res.RetryAfter = time.Duration(retryAfterSecond) * time.Second
}
return res
}
func (e *RetryableError) Error() string {
return fmt.Sprintf("retryable error with retry-after=%s: %s", e.RetryAfter, e.Err)
}

@ -1,7 +1,9 @@
package backoff
import (
"errors"
"github.com/stretchr/testify/assert"
"net/http"
"testing"
"time"
)
@ -9,14 +11,51 @@ import (
func TestConstantBackoff_Next(t *testing.T) {
a := assert.New(t)
// General error
{
err := errors.New("error")
b := &ConstantBackoff{Sleep: time.Duration(0), Max: 3}
a.True(b.Next())
a.True(b.Next())
a.True(b.Next())
a.False(b.Next())
a.True(b.Next(err))
a.True(b.Next(err))
a.True(b.Next(err))
a.False(b.Next(err))
b.Reset()
a.True(b.Next())
a.True(b.Next())
a.True(b.Next())
a.False(b.Next())
a.True(b.Next(err))
a.True(b.Next(err))
a.True(b.Next(err))
a.False(b.Next(err))
}
// Retryable error
{
err := &RetryableError{RetryAfter: time.Duration(1)}
b := &ConstantBackoff{Sleep: time.Duration(0), Max: 3}
a.True(b.Next(err))
a.True(b.Next(err))
a.True(b.Next(err))
a.False(b.Next(err))
b.Reset()
a.True(b.Next(err))
a.True(b.Next(err))
a.True(b.Next(err))
a.False(b.Next(err))
}
}
func TestNewRetryableErrorFromHeader(t *testing.T) {
a := assert.New(t)
// no retry-after header
{
err := NewRetryableErrorFromHeader(nil, http.Header{})
a.Empty(err.RetryAfter)
}
// with retry-after header
{
header := http.Header{}
header.Add("retry-after", "120")
err := NewRetryableErrorFromHeader(nil, header)
a.EqualValues(time.Duration(120)*time.Second, err.RetryAfter)
}
}

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/chunk/backoff"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/request"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
"io"
"os"
@ -66,7 +67,7 @@ func (c *ChunkGroup) TempAvailable() bool {
// Process a chunk with retry logic
func (c *ChunkGroup) Process(processor ChunkProcessFunc) error {
reader := io.LimitReader(c.file, int64(c.chunkSize))
reader := io.LimitReader(c.file, c.Length())
// If useBuffer is enabled, tee the reader to a temp file
if c.enableRetryBuffer && c.bufferTemp == nil && !c.file.Seekable() {
@ -90,13 +91,17 @@ func (c *ChunkGroup) Process(processor ChunkProcessFunc) error {
}
util.Log().Debug("Chunk %d will be read from temp file %q.", c.Index(), c.bufferTemp.Name())
reader = c.bufferTemp
reader = io.NopCloser(c.bufferTemp)
}
}
err := processor(c, reader)
if err != nil {
if err != context.Canceled && (c.file.Seekable() || c.TempAvailable()) && c.backoff.Next() {
if c.enableRetryBuffer {
request.BlackHole(reader)
}
if err != context.Canceled && (c.file.Seekable() || c.TempAvailable()) && c.backoff.Next(err) {
if c.file.Seekable() {
if _, seekErr := c.file.Seek(c.Start(), io.SeekStart); seekErr != nil {
return fmt.Errorf("failed to seek back to chunk start: %w, last error: %s", seekErr, err)

@ -218,7 +218,7 @@ func (handler Driver) Delete(ctx context.Context, files []string) ([]string, err
return failed, nil
}
return failed, errors.New("删除失败")
return failed, errors.New("delete failed")
}
// Thumb 获取文件缩略图

@ -43,7 +43,7 @@ func (handler Driver) List(ctx context.Context, path string, recursive bool) ([]
}
if err != nil {
util.Log().Warning("无法遍历目录 %s, %s", path, err)
util.Log().Warning("Failed to walk folder %q: %s", path, err)
return filepath.SkipDir
}
@ -78,7 +78,7 @@ func (handler Driver) Get(ctx context.Context, path string) (response.RSCloser,
// 打开文件
file, err := os.Open(util.RelativePath(path))
if err != nil {
util.Log().Debug("无法打开文件:%s", err)
util.Log().Debug("Failed to open file: %s", err)
return nil, err
}
@ -94,8 +94,8 @@ func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
// 如果非 Overwrite则检查是否有重名冲突
if fileInfo.Mode&fsctx.Overwrite != fsctx.Overwrite {
if util.Exists(dst) {
util.Log().Warning("物理同名文件已存在或不可用: %s", dst)
return errors.New("物理同名文件已存在或不可用")
util.Log().Warning("File with the same name existed or unavailable: %s", dst)
return errors.New("file with the same name existed or unavailable")
}
}
@ -104,7 +104,7 @@ func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
if !util.Exists(basePath) {
err := os.MkdirAll(basePath, Perm)
if err != nil {
util.Log().Warning("无法创建目录,%s", err)
util.Log().Warning("Failed to create directory: %s", err)
return err
}
}
@ -123,7 +123,7 @@ func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
out, err = os.OpenFile(dst, openMode, Perm)
if err != nil {
util.Log().Warning("无法打开或创建文件,%s", err)
util.Log().Warning("Failed to open or create file: %s", err)
return err
}
defer out.Close()
@ -131,22 +131,22 @@ func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
if fileInfo.Mode&fsctx.Append == fsctx.Append {
stat, err := out.Stat()
if err != nil {
util.Log().Warning("无法读取文件信息,%s", err)
util.Log().Warning("Failed to read file info: %s", err)
return err
}
if uint64(stat.Size()) < fileInfo.AppendStart {
return errors.New("未上传完成的文件分片与预期大小不一致")
return errors.New("size of unfinished uploaded chunks is not as expected")
} else if uint64(stat.Size()) > fileInfo.AppendStart {
out.Close()
if err := handler.Truncate(ctx, dst, fileInfo.AppendStart); err != nil {
return fmt.Errorf("覆盖分片时发生错误: %w", err)
return fmt.Errorf("failed to overwrite chunk: %w", err)
}
out, err = os.OpenFile(dst, openMode, Perm)
defer out.Close()
if err != nil {
util.Log().Warning("无法打开或创建文件,%s", err)
util.Log().Warning("Failed to create or open file: %s", err)
return err
}
}
@ -158,10 +158,10 @@ func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
}
func (handler Driver) Truncate(ctx context.Context, src string, size uint64) error {
util.Log().Warning("截断文件 [%s] 至 [%d]", src, size)
util.Log().Warning("Truncate file %q to [%d].", src, size)
out, err := os.OpenFile(src, os.O_WRONLY, Perm)
if err != nil {
util.Log().Warning("无法打开文件,%s", err)
util.Log().Warning("Failed to open file: %s", err)
return err
}
@ -180,7 +180,7 @@ func (handler Driver) Delete(ctx context.Context, files []string) ([]string, err
if util.Exists(filePath) {
err := os.Remove(filePath)
if err != nil {
util.Log().Warning("无法删除文件,%s", err)
util.Log().Warning("Failed to delete file: %s", err)
retErr = err
deleteFailed = append(deleteFailed, value)
}
@ -217,7 +217,7 @@ func (handler Driver) Source(
) (string, error) {
file, ok := ctx.Value(fsctx.FileModelCtx).(model.File)
if !ok {
return "", errors.New("无法获取文件记录上下文")
return "", errors.New("failed to read file model context")
}
// 是否启用了CDN
@ -238,7 +238,7 @@ func (handler Driver) Source(
downloadSessionID := util.RandStringRunes(16)
err = cache.Set("download_"+downloadSessionID, file, int(ttl))
if err != nil {
return "", serializer.NewError(serializer.CodeCacheOperation, "无法创建下载会话", err)
return "", serializer.NewError(serializer.CodeCacheOperation, "Failed to create download session", err)
}
// 签名生成文件记录
@ -257,7 +257,7 @@ func (handler Driver) Source(
}
if err != nil {
return "", serializer.NewError(serializer.CodeEncryptError, "无法对URL进行签名", err)
return "", serializer.NewError(serializer.CodeEncryptError, "Failed to sign url", err)
}
finalURL := baseURL.ResolveReference(signedURI).String()

@ -36,7 +36,7 @@ func TestHandler_Put(t *testing.T) {
{&fsctx.FileStream{
SavePath: "TestHandler_Put.txt",
File: io.NopCloser(strings.NewReader("")),
}, "物理同名文件已存在或不可用"},
}, "file with the same name existed or unavailable"},
{&fsctx.FileStream{
SavePath: "inner/TestHandler_Put.txt",
File: io.NopCloser(strings.NewReader("")),
@ -51,7 +51,7 @@ func TestHandler_Put(t *testing.T) {
Mode: fsctx.Append | fsctx.Overwrite,
SavePath: "inner/TestHandler_Put.txt",
File: io.NopCloser(strings.NewReader("123")),
}, "未上传完成的文件分片与预期大小不一致"},
}, "size of unfinished uploaded chunks is not as expected"},
{&fsctx.FileStream{
Mode: fsctx.Append | fsctx.Overwrite,
SavePath: "inner/TestHandler_Put.txt",

@ -7,7 +7,6 @@ import (
"fmt"
"github.com/cloudreve/Cloudreve/v3/pkg/conf"
"io"
"io/ioutil"
"net/http"
"net/url"
"path"
@ -37,24 +36,18 @@ const (
// GetSourcePath 获取文件的绝对路径
func (info *FileInfo) GetSourcePath() string {
res, err := url.PathUnescape(
strings.TrimPrefix(
res, err := url.PathUnescape(info.ParentReference.Path)
if err != nil {
return ""
}
return strings.TrimPrefix(
path.Join(
strings.TrimPrefix(info.ParentReference.Path, "/drive/root:"),
strings.TrimPrefix(res, "/drive/root:"),
info.Name,
),
"/",
),
)
if err != nil {
return ""
}
return res
}
// Error 实现error接口
func (err RespError) Error() string {
return err.APIError.Message
}
func (client *Client) getRequestURL(api string, opts ...Option) string {
@ -95,7 +88,7 @@ func (client *Client) ListChildren(ctx context.Context, path string) ([]FileInfo
}
if retried < ListRetry {
retried++
util.Log().Debug("路径[%s]列取请求失败[%s]5秒钟后重试", path, err)
util.Log().Debug("Failed to list path %q: %s, will retry in 5 seconds.", path, err)
time.Sleep(time.Duration(5) * time.Second)
return client.ListChildren(context.WithValue(ctx, fsctx.RetryCtx, retried), path)
}
@ -445,7 +438,7 @@ func (client *Client) GetThumbURL(ctx context.Context, dst string, w, h uint) (s
}
}
return "", errors.New("无法生成缩略图")
return "", errors.New("failed to generate thumb")
}
// MonitorUpload 监控客户端分片上传进度
@ -460,39 +453,39 @@ func (client *Client) MonitorUpload(uploadURL, callbackKey, path string, size ui
for {
select {
case <-callbackChan:
util.Log().Debug("客户端完成回调")
util.Log().Debug("Client finished OneDrive callback.")
return
case <-time.After(time.Duration(ttl) * time.Second):
// 上传会话到期,仍未完成上传,创建占位符
client.DeleteUploadSession(context.Background(), uploadURL)
_, err := client.SimpleUpload(context.Background(), path, strings.NewReader(""), 0, WithConflictBehavior("replace"))
if err != nil {
util.Log().Debug("无法创建占位文件,%s", err)
util.Log().Debug("Failed to create placeholder file: %s", err)
}
return
case <-time.After(time.Duration(timeout) * time.Second):
util.Log().Debug("检查上传情况")
util.Log().Debug("Checking OneDrive upload status.")
status, err := client.GetUploadSessionStatus(context.Background(), uploadURL)
if err != nil {
if resErr, ok := err.(*RespError); ok {
if resErr.APIError.Code == "itemNotFound" {
util.Log().Debug("上传会话已完成,稍后检查回调")
util.Log().Debug("Upload completed, will check upload callback later.")
select {
case <-time.After(time.Duration(interval) * time.Second):
util.Log().Warning("未发送回调,删除文件")
util.Log().Warning("No callback is made, file will be deleted.")
cache.Deletes([]string{callbackKey}, "callback_")
_, err = client.Delete(context.Background(), []string{path})
if err != nil {
util.Log().Warning("无法删除未回调的文件,%s", err)
util.Log().Warning("Failed to delete file without callback: %s", err)
}
case <-callbackChan:
util.Log().Debug("客户端完成回调")
util.Log().Debug("Client finished callback.")
}
return
}
}
util.Log().Debug("无法获取上传会话状态,继续下一轮,%s", err.Error())
util.Log().Debug("Failed to get upload session status: %s, continue next iteration.", err.Error())
continue
}
@ -509,7 +502,7 @@ func (client *Client) MonitorUpload(uploadURL, callbackKey, path string, size ui
}
uploadFullSize, _ := strconv.ParseUint(sizeRange[1], 10, 64)
if (sizeRange[0] == "0" && sizeRange[1] == "") || uploadFullSize+1 != size {
util.Log().Debug("未开始上传或文件大小不一致,取消上传会话")
util.Log().Debug("Upload has not started, or uploaded file size not match, canceling upload session...")
// 取消上传会话实测OneDrive取消上传会话后客户端还是可以上传
// 所以上传一个空文件占位,阻止客户端上传
client.DeleteUploadSession(context.Background(), uploadURL)
@ -531,7 +524,7 @@ func sysError(err error) *RespError {
}}
}
func (client *Client) request(ctx context.Context, method string, url string, body io.Reader, option ...request.Option) (string, *RespError) {
func (client *Client) request(ctx context.Context, method string, url string, body io.Reader, option ...request.Option) (string, error) {
// 获取凭证
err := client.UpdateCredential(ctx, conf.SystemConfig.Mode == "slave")
if err != nil {
@ -577,18 +570,24 @@ func (client *Client) request(ctx context.Context, method string, url string, bo
if res.Response.StatusCode < 200 || res.Response.StatusCode >= 300 {
decodeErr = json.Unmarshal([]byte(respBody), &errResp)
if decodeErr != nil {
util.Log().Debug("Onedrive返回未知响应[%s]", respBody)
util.Log().Debug("Onedrive returns unknown response: %s", respBody)
return "", sysError(decodeErr)
}
if res.Response.StatusCode == 429 {
util.Log().Warning("OneDrive request is throttled.")
return "", backoff.NewRetryableErrorFromHeader(&errResp, res.Response.Header)
}
return "", &errResp
}
return respBody, nil
}
func (client *Client) requestWithStr(ctx context.Context, method string, url string, body string, expectedCode int) (string, *RespError) {
func (client *Client) requestWithStr(ctx context.Context, method string, url string, body string, expectedCode int) (string, error) {
// 发送请求
bodyReader := ioutil.NopCloser(strings.NewReader(body))
bodyReader := io.NopCloser(strings.NewReader(body))
return client.request(ctx, method, url, bodyReader,
request.WithContentLength(int64(len(body))),
)

@ -112,6 +112,35 @@ func TestRequest(t *testing.T) {
asserts.Equal("error msg", err.Error())
}
// OneDrive返回429错误
{
header := http.Header{}
header.Add("retry-after", "120")
clientMock := ClientMock{}
clientMock.On(
"Request",
"POST",
"http://dev.com",
testMock.Anything,
testMock.Anything,
).Return(&request.Response{
Err: nil,
Response: &http.Response{
StatusCode: 429,
Header: header,
Body: ioutil.NopCloser(strings.NewReader(`{"error":{"message":"error msg"}}`)),
},
})
client.Request = clientMock
res, err := client.request(context.Background(), "POST", "http://dev.com", strings.NewReader(""))
clientMock.AssertExpectations(t)
asserts.Error(err)
asserts.Empty(res)
var retryErr *backoff.RetryableError
asserts.ErrorAs(err, &retryErr)
asserts.EqualValues(time.Duration(120)*time.Second, retryErr.RetryAfter)
}
// OneDrive返回未知响应
{
clientMock := ClientMock{}
@ -144,18 +173,18 @@ func TestFileInfo_GetSourcePath(t *testing.T) {
fileInfo := FileInfo{
Name: "%e6%96%87%e4%bb%b6%e5%90%8d.jpg",
ParentReference: parentReference{
Path: "/drive/root:/123/321",
Path: "/drive/root:/123/32%201",
},
}
asserts.Equal("123/321/文件名.jpg", fileInfo.GetSourcePath())
asserts.Equal("123/32 1/%e6%96%87%e4%bb%b6%e5%90%8d.jpg", fileInfo.GetSourcePath())
}
// 失败
{
fileInfo := FileInfo{
Name: "%e6%96%87%e4%bb%b6%e5%90%8g.jpg",
Name: "123.jpg",
ParentReference: parentReference{
Path: "/drive/root:/123/321",
Path: "/drive/root:/123/%e6%96%87%e4%bb%b6%e5%90%8g",
},
}
asserts.Equal("", fileInfo.GetSourcePath())

@ -10,13 +10,13 @@ import (
var (
// ErrAuthEndpoint 无法解析授权端点地址
ErrAuthEndpoint = errors.New("无法解析授权端点地址")
ErrAuthEndpoint = errors.New("failed to parse endpoint url")
// ErrInvalidRefreshToken 上传策略无有效的RefreshToken
ErrInvalidRefreshToken = errors.New("上传策略无有效的RefreshToken")
ErrInvalidRefreshToken = errors.New("no valid refresh token in this policy")
// ErrDeleteFile 无法删除文件
ErrDeleteFile = errors.New("无法删除文件")
ErrDeleteFile = errors.New("cannot delete file")
// ErrClientCanceled 客户端取消操作
ErrClientCanceled = errors.New("客户端取消操作")
ErrClientCanceled = errors.New("client canceled")
)
// Client OneDrive客户端

@ -11,7 +11,6 @@ import (
"time"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/cache"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
@ -143,7 +142,7 @@ func (handler Driver) Thumb(ctx context.Context, path string) (*response.Content
ok = false
)
if thumbSize, ok = ctx.Value(fsctx.ThumbSizeCtx).([2]uint); !ok {
return nil, errors.New("无法获取缩略图尺寸设置")
return nil, errors.New("failed to get thumbnail size")
}
res, err := handler.Client.GetThumbURL(ctx, path, thumbSize[0], thumbSize[1])
@ -171,19 +170,6 @@ func (handler Driver) Source(
cacheKey := fmt.Sprintf("onedrive_source_%d_%s", handler.Policy.ID, path)
if file, ok := ctx.Value(fsctx.FileModelCtx).(model.File); ok {
cacheKey = fmt.Sprintf("onedrive_source_file_%d_%d", file.UpdatedAt.Unix(), file.ID)
// 如果是永久链接,则返回签名后的中转外链
if ttl == 0 {
signedURI, err := auth.SignURI(
auth.General,
fmt.Sprintf("/api/v3/file/source/%d/%s", file.ID, file.Name),
ttl,
)
if err != nil {
return "", err
}
return baseURL.ResolveReference(signedURI).String(), nil
}
}
// 尝试从缓存中查找

@ -3,7 +3,6 @@ package onedrive
import (
"context"
"fmt"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/jinzhu/gorm"
@ -161,21 +160,6 @@ func TestDriver_Source(t *testing.T) {
asserts.NoError(err)
asserts.Equal("123321", res)
}
// 成功 永久直链
{
file := model.File{}
file.ID = 1
file.Name = "123.jpg"
file.UpdatedAt = time.Now()
ctx := context.WithValue(context.Background(), fsctx.FileModelCtx, file)
handler.Client.Credential.ExpiresIn = time.Now().Add(time.Duration(100) * time.Hour).Unix()
auth.General = auth.HMACAuth{}
handler.Client.Credential.AccessToken = "1"
res, err := handler.Source(ctx, "123.jpg", url.URL{}, 0, true, 0)
asserts.NoError(err)
asserts.Contains(res, "/api/v3/file/source/1/123.jpg?sign")
}
}
func TestDriver_List(t *testing.T) {

@ -152,7 +152,7 @@ func (client *Client) UpdateCredential(ctx context.Context, isSlave bool) error
// 获取新的凭证
if client.Credential == nil || client.Credential.RefreshToken == "" {
// 无有效的RefreshToken
util.Log().Error("上传策略[%s]凭证刷新失败请重新授权OneDrive账号", client.Policy.Name)
util.Log().Error("Failed to refresh credential for policy %q, please login your Microsoft account again.", client.Policy.Name)
return ErrInvalidRefreshToken
}

@ -133,3 +133,8 @@ type Site struct {
func init() {
gob.Register(Credential{})
}
// Error 实现error接口
func (err RespError) Error() string {
return err.APIError.Message
}

@ -38,7 +38,7 @@ func GetPublicKey(r *http.Request) ([]byte, error) {
// 确保这个 public key 是由 OSS 颁发的
if !strings.HasPrefix(string(pubURL), "http://gosspublic.alicdn.com/") &&
!strings.HasPrefix(string(pubURL), "https://gosspublic.alicdn.com/") {
return pubKey, errors.New("公钥URL无效")
return pubKey, errors.New("public key url invalid")
}
// 获取公钥

@ -91,7 +91,7 @@ func (handler *Driver) CORS() error {
// InitOSSClient 初始化OSS鉴权客户端
func (handler *Driver) InitOSSClient(forceUsePublicEndpoint bool) error {
if handler.Policy == nil {
return errors.New("存储策略为空")
return errors.New("empty policy")
}
// 决定是否使用内网 Endpoint
@ -286,7 +286,7 @@ func (handler *Driver) Delete(ctx context.Context, files []string) ([]string, er
// 统计未删除的文件
failed := util.SliceDifference(files, delRes.DeletedObjects)
if len(failed) > 0 {
return failed, errors.New("删除失败")
return failed, errors.New("failed to delete")
}
return []string{}, nil
@ -304,7 +304,7 @@ func (handler *Driver) Thumb(ctx context.Context, path string) (*response.Conten
ok = false
)
if thumbSize, ok = ctx.Value(fsctx.ThumbSizeCtx).([2]uint); !ok {
return nil, errors.New("无法获取缩略图尺寸设置")
return nil, errors.New("failed to get thumbnail size")
}
thumbParam := fmt.Sprintf("image/resize,m_lfit,h_%d,w_%d", thumbSize[1], thumbSize[0])

@ -197,7 +197,7 @@ func (handler *Driver) Delete(ctx context.Context, files []string) ([]string, er
return failedResp.Files, errors.New(reqResp.Error)
}
}
return files, errors.New("未知的返回结果格式")
return files, errors.New("unknown format of returned response")
}
return []string{}, nil
@ -265,7 +265,7 @@ func (handler *Driver) Source(
)
if err != nil {
return "", serializer.NewError(serializer.CodeEncryptError, "无法对URL进行签名", err)
return "", serializer.NewError(serializer.CodeEncryptError, "Failed to sign URL", err)
}
finalURL := serverURL.ResolveReference(signedURI).String()

@ -62,7 +62,7 @@ func NewDriver(policy *model.Policy) (*Driver, error) {
// InitS3Client 初始化S3会话
func (handler *Driver) InitS3Client() error {
if handler.Policy == nil {
return errors.New("存储策略为空")
return errors.New("empty policy")
}
if handler.svc == nil {
@ -407,6 +407,7 @@ func (handler *Driver) Meta(ctx context.Context, path string) (*MetaData, error)
if err != nil {
return nil, err
}
defer res.Body.Close()
return &MetaData{
Size: uint64(*res.ContentLength),

@ -5,6 +5,9 @@ import (
"context"
"encoding/json"
"errors"
"net/url"
"time"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver"
@ -13,8 +16,6 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
"github.com/cloudreve/Cloudreve/v3/pkg/request"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"net/url"
"time"
)
// Driver 影子存储策略,将上传任务指派给从机节点处理,并等待从机通知上传结果
@ -118,6 +119,6 @@ func (d *Driver) List(ctx context.Context, path string, recursive bool) ([]respo
}
// 取消上传凭证
func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error {
func (d *Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error {
return nil
}

@ -226,7 +226,7 @@ func (handler Driver) Thumb(ctx context.Context, path string) (*response.Content
ok = false
)
if thumbSize, ok = ctx.Value(fsctx.ThumbSizeCtx).([2]uint); !ok {
return nil, errors.New("无法获取缩略图尺寸设置")
return nil, errors.New("failed to get thumbnail size")
}
thumbParam := fmt.Sprintf("!/fwfh/%dx%d", thumbSize[0], thumbSize[1])

@ -8,17 +8,17 @@ import (
var (
ErrUnknownPolicyType = serializer.NewError(serializer.CodeInternalSetting, "Unknown policy type", nil)
ErrFileSizeTooBig = serializer.NewError(serializer.CodeFileTooLarge, "", nil)
ErrFileExtensionNotAllowed = serializer.NewError(serializer.CodeFileTypeNotAllowed, "", nil)
ErrInsufficientCapacity = serializer.NewError(serializer.CodeInsufficientCapacity, "", nil)
ErrIllegalObjectName = serializer.NewError(serializer.CodeIllegalObjectName, "", nil)
ErrFileSizeTooBig = serializer.NewError(serializer.CodeFileTooLarge, "File is too large", nil)
ErrFileExtensionNotAllowed = serializer.NewError(serializer.CodeFileTypeNotAllowed, "File type not allowed", nil)
ErrInsufficientCapacity = serializer.NewError(serializer.CodeInsufficientCapacity, "Insufficient capacity", nil)
ErrIllegalObjectName = serializer.NewError(serializer.CodeIllegalObjectName, "Invalid object name", nil)
ErrClientCanceled = errors.New("Client canceled operation")
ErrRootProtected = serializer.NewError(serializer.CodeRootProtected, "", nil)
ErrRootProtected = serializer.NewError(serializer.CodeRootProtected, "Root protected", nil)
ErrInsertFileRecord = serializer.NewError(serializer.CodeDBError, "Failed to create file record", nil)
ErrFileExisted = serializer.NewError(serializer.CodeObjectExist, "", nil)
ErrFileUploadSessionExisted = serializer.NewError(serializer.CodeConflictUploadOngoing, "", nil)
ErrPathNotExist = serializer.NewError(serializer.CodeParentNotExist, "", nil)
ErrObjectNotExist = serializer.NewError(serializer.CodeParentNotExist, "", nil)
ErrFileExisted = serializer.NewError(serializer.CodeObjectExist, "Object existed", nil)
ErrFileUploadSessionExisted = serializer.NewError(serializer.CodeConflictUploadOngoing, "Upload session existed", nil)
ErrPathNotExist = serializer.NewError(serializer.CodeParentNotExist, "Path not exist", nil)
ErrObjectNotExist = serializer.NewError(serializer.CodeParentNotExist, "Object not exist", nil)
ErrIO = serializer.NewError(serializer.CodeIOFailed, "Failed to read file data", nil)
ErrDBListObjects = serializer.NewError(serializer.CodeDBError, "Failed to list object records", nil)
ErrDBDeleteObjects = serializer.NewError(serializer.CodeDBError, "Failed to delete object records", nil)

@ -72,7 +72,7 @@ func (fs *FileSystem) AddFile(ctx context.Context, parent *model.Folder, file fs
if err != nil {
if err := fs.Trigger(ctx, "AfterValidateFailed", file); err != nil {
util.Log().Debug("AfterValidateFailed 钩子执行失败,%s", err)
util.Log().Debug("AfterValidateFailed hook execution failed: %s", err)
}
return nil, ErrFileExisted.WithError(err)
}
@ -203,7 +203,7 @@ func (fs *FileSystem) deleteGroupedFile(ctx context.Context, files map[uint][]*m
// 取消上传会话
for _, upSession := range uploadSessions {
if err := fs.Handler.CancelToken(ctx, upSession); err != nil {
util.Log().Warning("无法取消 [%s] 的上传会话: %s", upSession.Name, err)
util.Log().Warning("Failed to cancel upload session for %q: %s", upSession.Name, err)
}
cache.Deletes([]string{upSession.Key}, UploadSessionCachePrefix)
@ -270,14 +270,14 @@ func (fs *FileSystem) GetSource(ctx context.Context, fileID uint) (string, error
if !fs.Policy.IsOriginLinkEnable {
return "", serializer.NewError(
serializer.CodePolicyNotAllowed,
"当前存储策略无法获得外链",
"This policy is not enabled for getting source link",
nil,
)
}
source, err := fs.SignURL(ctx, &fs.FileTarget[0], 0, false)
if err != nil {
return "", serializer.NewError(serializer.CodeNotSet, "无法获取外链", err)
return "", serializer.NewError(serializer.CodeNotSet, "Failed to get source link", err)
}
return source, nil
@ -298,7 +298,7 @@ func (fs *FileSystem) SignURL(ctx context.Context, file *model.File, ttl int64,
siteURL := model.GetSiteURL()
source, err := fs.Handler.Source(ctx, fs.FileTarget[0].SourceName, *siteURL, ttl, isDownload, fs.User.Group.SpeedLimit)
if err != nil {
return "", serializer.NewError(serializer.CodeNotSet, "无法获取外链", err)
return "", serializer.NewError(serializer.CodeNotSet, "Failed to get source link", err)
}
return source, nil

@ -203,7 +203,7 @@ func NewFileSystemFromCallback(c *gin.Context) (*FileSystem, error) {
// 获取回调会话
callbackSessionRaw, ok := c.Get(UploadSessionCtx)
if !ok {
return nil, errors.New("找不到回调会话")
return nil, errors.New("upload session not exist")
}
callbackSession := callbackSessionRaw.(*serializer.UploadSession)

@ -44,7 +44,7 @@ func (fs *FileSystem) Trigger(ctx context.Context, name string, file fsctx.FileH
for _, hook := range hooks {
err := hook(ctx, fs, file)
if err != nil {
util.Log().Warning("钩子执行失败%s", err)
util.Log().Warning("Failed to execute hook%s", err)
return err
}
}
@ -112,7 +112,7 @@ func HookDeleteTempFile(ctx context.Context, fs *FileSystem, file fsctx.FileHead
// 删除临时文件
_, err := fs.Handler.Delete(ctx, []string{file.Info().SavePath})
if err != nil {
util.Log().Warning("无法清理上传临时文件,%s", err)
util.Log().Warning("Failed to clean-up temp files: %s", err)
}
return nil

@ -71,16 +71,16 @@ func getThumbWorker() *Pool {
thumbPool = &Pool{
worker: make(chan int, maxWorker),
}
util.Log().Debug("初始化Thumb任务队列WorkerNum = %d", maxWorker)
util.Log().Debug("Initialize thumbnails task queue with: WorkerNum = %d", maxWorker)
})
return thumbPool
}
func (pool *Pool) addWorker() {
pool.worker <- 1
util.Log().Debug("Thumb任务队列addWorker")
util.Log().Debug("Worker added to thumbnails task queue.")
}
func (pool *Pool) releaseWorker() {
util.Log().Debug("Thumb任务队列releaseWorker")
util.Log().Debug("Worker released from thumbnails task queue.")
<-pool.worker
}
@ -107,7 +107,7 @@ func (fs *FileSystem) GenerateThumbnail(ctx context.Context, file *model.File) {
image, err := thumb.NewThumbFromFile(source, file.Name)
if err != nil {
util.Log().Warning("生成缩略图时无法解析 [%s] 图像数据:%s", file.SourceName, err)
util.Log().Warning("Cannot generate thumb because of failed to parse image %q: %s", file.SourceName, err)
return
}
@ -125,7 +125,7 @@ func (fs *FileSystem) GenerateThumbnail(ctx context.Context, file *model.File) {
}
if err != nil {
util.Log().Warning("无法保存缩略图:%s", err)
util.Log().Warning("Failed to save thumb: %s", err)
return
}

@ -472,6 +472,9 @@ func TestFileSystem_Delete(t *testing.T) {
AddRow(4, "1.txt", "1.txt", 365, 1),
)
mock.ExpectQuery("SELECT(.+)").WillReturnRows(sqlmock.NewRows([]string{"id", "name", "source_name", "policy_id", "size"}).AddRow(1, "2.txt", "2.txt", 365, 2))
// 两次查询软连接
mock.ExpectQuery("SELECT(.+)files(.+)").
WillReturnRows(sqlmock.NewRows([]string{"id", "policy_id", "source_name"}))
mock.ExpectQuery("SELECT(.+)files(.+)").
WillReturnRows(sqlmock.NewRows([]string{"id", "policy_id", "source_name"}))
// 查询上传策略
@ -527,6 +530,9 @@ func TestFileSystem_Delete(t *testing.T) {
AddRow(4, "1.txt", "1.txt", 602, 1),
)
mock.ExpectQuery("SELECT(.+)").WillReturnRows(sqlmock.NewRows([]string{"id", "name", "source_name", "policy_id", "size"}).AddRow(1, "2.txt", "2.txt", 602, 2))
// 两次查询软连接
mock.ExpectQuery("SELECT(.+)files(.+)").
WillReturnRows(sqlmock.NewRows([]string{"id", "policy_id", "source_name"}))
mock.ExpectQuery("SELECT(.+)files(.+)").
WillReturnRows(sqlmock.NewRows([]string{"id", "policy_id", "source_name"}))
// 查询上传策略

@ -69,7 +69,7 @@ func (fs *FileSystem) Upload(ctx context.Context, file *fsctx.FileStream) (err e
followUpErr := fs.Trigger(ctx, "AfterValidateFailed", file)
// 失败后再失败...
if followUpErr != nil {
util.Log().Debug("AfterValidateFailed 钩子执行失败,%s", followUpErr)
util.Log().Debug("AfterValidateFailed hook execution failed: %s", followUpErr)
}
return err
@ -113,13 +113,13 @@ func (fs *FileSystem) CancelUpload(ctx context.Context, path string, file fsctx.
// 客户端正常关闭,不执行操作
default:
// 客户端取消上传,删除临时文件
util.Log().Debug("客户端取消上传")
util.Log().Debug("Client canceled upload.")
if fs.Hooks["AfterUploadCanceled"] == nil {
return
}
err := fs.Trigger(ctx, "AfterUploadCanceled", file)
if err != nil {
util.Log().Debug("执行 AfterUploadCanceled 钩子出错,%s", err)
util.Log().Debug("AfterUploadCanceled hook execution failed: %s", err)
}
}

@ -15,11 +15,12 @@ const (
FolderID // 目录ID
TagID // 标签ID
PolicyID // 存储策略ID
SourceLinkID
)
var (
// ErrTypeNotMatch ID类型不匹配
ErrTypeNotMatch = errors.New("ID类型不匹配")
ErrTypeNotMatch = errors.New("mismatched ID type.")
)
// HashEncode 对给定数据计算HashID

@ -44,6 +44,12 @@ func newDefaultOption() *options {
}
}
func (o *options) clone() options {
newOptions := *o
newOptions.header = o.header.Clone()
return newOptions
}
// WithTimeout 设置请求超时
func WithTimeout(t time.Duration) Option {
return optionFunc(func(o *options) {

@ -56,7 +56,7 @@ func NewClient(opts ...Option) Client {
func (c *HTTPClient) Request(method, target string, body io.Reader, opts ...Option) *Response {
// 应用额外设置
c.mu.Lock()
options := *c.options
options := c.options.clone()
c.mu.Unlock()
for _, o := range opts {
o.apply(&options)
@ -179,7 +179,7 @@ func (resp *Response) DecodeResponse() (*serializer.Response, error) {
var res serializer.Response
err = json.Unmarshal([]byte(respString), &res)
if err != nil {
util.Log().Debug("无法解析回调服务端响应:%s", string(respString))
util.Log().Debug("Failed to parse response: %s", string(respString))
return nil, err
}
return &res, nil
@ -251,7 +251,7 @@ func (instance NopRSCloser) Seek(offset int64, whence int) (int64, error) {
return instance.status.Size, nil
}
}
return 0, errors.New("未实现")
return 0, errors.New("not implemented")
}

@ -19,6 +19,7 @@ type DownloadListResponse struct {
Downloaded uint64 `json:"downloaded"`
Speed int `json:"speed"`
Info rpc.StatusInfo `json:"info"`
NodeName string `json:"node"`
}
// FinishedListResponse 已完成任务条目
@ -34,6 +35,7 @@ type FinishedListResponse struct {
TaskError string `json:"task_error"`
CreateTime time.Time `json:"create"`
UpdateTime time.Time `json:"update"`
NodeName string `json:"node"`
}
// BuildFinishedListResponse 构建已完成任务条目
@ -62,6 +64,7 @@ func BuildFinishedListResponse(tasks []model.Download) Response {
TaskStatus: -1,
UpdateTime: tasks[i].UpdatedAt,
CreateTime: tasks[i].CreatedAt,
NodeName: tasks[i].NodeName,
}
if tasks[i].Task != nil {
@ -106,6 +109,7 @@ func BuildDownloadingResponse(tasks []model.Download, intervals map[uint]int) Re
Downloaded: tasks[i].DownloadedSize,
Speed: tasks[i].Speed,
Info: tasks[i].StatusInfo,
NodeName: tasks[i].NodeName,
})
}

@ -172,6 +172,26 @@ const (
CodeSlavePingMaster = 40060
// Cloudreve 版本不一致
CodeVersionMismatch = 40061
// 积分不足
CodeInsufficientCredit = 40062
// 用户组冲突
CodeGroupConflict = 40063
// 当前已处于此用户组中
CodeGroupInvalid = 40064
// 兑换码无效
CodeInvalidGiftCode = 40065
// 已绑定了QQ账号
CodeQQBindConflict = 40066
// QQ账号已被绑定其他账号
CodeQQBindOtherAccount = 40067
// QQ 未绑定对应账号
CodeQQNotLinked = 40068
// 密码不正确
CodeIncorrectPassword = 40069
// 分享无法预览
CodeDisabledSharePreview = 40070
// 签名无效
CodeInvalidSign = 40071
// CodeDBError 数据库操作失败
CodeDBError = 50001
// CodeEncryptError 加密失败
@ -201,7 +221,7 @@ const (
// DBErr 数据库操作失败
func DBErr(msg string, err error) Response {
if msg == "" {
msg = "数据库操作失败"
msg = "Database operation failed."
}
return Err(CodeDBError, msg, err)
}
@ -209,7 +229,7 @@ func DBErr(msg string, err error) Response {
// ParamErr 各种参数错误
func ParamErr(msg string, err error) Response {
if msg == "" {
msg = "参数错误"
msg = "Invalid parameters."
}
return Err(CodeParamErr, msg, err)
}

@ -19,7 +19,7 @@ func NewResponseWithGobData(data interface{}) Response {
var w bytes.Buffer
encoder := gob.NewEncoder(&w)
if err := encoder.Encode(data); err != nil {
return Err(CodeInternalSetting, "无法编码返回结果", err)
return Err(CodeInternalSetting, "Failed to encode response content", err)
}
return Response{Data: w.Bytes()}

@ -4,6 +4,7 @@ import (
"crypto/sha1"
"encoding/gob"
"fmt"
model "github.com/cloudreve/Cloudreve/v3/models"
)

@ -1,9 +1,10 @@
package serializer
import (
"testing"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/stretchr/testify/assert"
"testing"
)
func TestSlaveTransferReq_Hash(t *testing.T) {

@ -13,7 +13,7 @@ import (
func CheckLogin() Response {
return Response{
Code: CodeCheckLogin,
Msg: "未登录",
Msg: "Login required",
}
}

@ -69,7 +69,7 @@ func (job *CompressTask) SetError(err *JobError) {
func (job *CompressTask) removeZipFile() {
if job.zipPath != "" {
if err := os.Remove(job.zipPath); err != nil {
util.Log().Warning("无法删除临时压缩文件 %s , %s", job.zipPath, err)
util.Log().Warning("Failed to delete temp zip file %q: %s", job.zipPath, err)
}
}
}
@ -93,7 +93,7 @@ func (job *CompressTask) Do() {
return
}
util.Log().Debug("开始压缩文件")
util.Log().Debug("Starting compress file...")
job.TaskModel.SetProgress(CompressingProgress)
// 创建临时压缩文件
@ -122,7 +122,7 @@ func (job *CompressTask) Do() {
job.zipPath = zipFilePath
zipFile.Close()
util.Log().Debug("压缩文件存放至%s开始上传", zipFilePath)
util.Log().Debug("Compressed file saved to %q, start uploading it...", zipFilePath)
job.TaskModel.SetProgress(TransferringProgress)
// 上传文件

@ -77,7 +77,7 @@ func (job *DecompressTask) Do() {
// 创建文件系统
fs, err := filesystem.NewFileSystem(job.User)
if err != nil {
job.SetErrorMsg("无法创建文件系统", err)
job.SetErrorMsg("Failed to create filesystem.", err)
return
}
@ -85,7 +85,7 @@ func (job *DecompressTask) Do() {
err = fs.Decompress(context.Background(), job.TaskProps.Src, job.TaskProps.Dst, job.TaskProps.Encoding)
if err != nil {
job.SetErrorMsg("解压缩失败", err)
job.SetErrorMsg("Failed to decompress file.", err)
return
}

@ -4,5 +4,5 @@ import "errors"
var (
// ErrUnknownTaskType 未知任务类型
ErrUnknownTaskType = errors.New("未知任务类型")
ErrUnknownTaskType = errors.New("unknown task type")
)

@ -81,7 +81,7 @@ func (job *ImportTask) Do() {
// 查找存储策略
policy, err := model.GetPolicyByID(job.TaskProps.PolicyID)
if err != nil {
job.SetErrorMsg("找不到存储策略", err)
job.SetErrorMsg("Policy not exist.", err)
return
}
@ -96,7 +96,7 @@ func (job *ImportTask) Do() {
fs.Policy = &policy
if err := fs.DispatchHandler(); err != nil {
job.SetErrorMsg("无法分发存储策略", err)
job.SetErrorMsg("Failed to dispatch policy.", err)
return
}
@ -110,7 +110,7 @@ func (job *ImportTask) Do() {
true)
objects, err := fs.Handler.List(ctx, job.TaskProps.Src, job.TaskProps.Recursive)
if err != nil {
job.SetErrorMsg("无法列取文件", err)
job.SetErrorMsg("Failed to list files.", err)
return
}
@ -126,7 +126,7 @@ func (job *ImportTask) Do() {
virtualPath := path.Join(job.TaskProps.Dst, object.RelativePath)
folder, err := fs.CreateDirectory(coxIgnoreConflict, virtualPath)
if err != nil {
util.Log().Warning("导入任务无法创建用户目录[%s], %s", virtualPath, err)
util.Log().Warning("Importing task cannot create user directory %q: %s", virtualPath, err)
} else if folder.ID > 0 {
pathCache[virtualPath] = folder
}
@ -152,7 +152,7 @@ func (job *ImportTask) Do() {
} else {
folder, err := fs.CreateDirectory(context.Background(), virtualPath)
if err != nil {
util.Log().Warning("导入任务无法创建用户目录[%s], %s",
util.Log().Warning("Importing task cannot create user directory %q: %s",
virtualPath, err)
continue
}
@ -163,10 +163,10 @@ func (job *ImportTask) Do() {
// 插入文件记录
_, err := fs.AddFile(context.Background(), parentFolder, &fileHeader)
if err != nil {
util.Log().Warning("导入任务无法创插入文件[%s], %s",
util.Log().Warning("Importing task cannot insert user file %q: %s",
object.RelativePath, err)
if err == filesystem.ErrInsufficientCapacity {
job.SetErrorMsg("容量不足", err)
job.SetErrorMsg("Insufficient storage capacity.", err)
return
}
}

@ -15,6 +15,8 @@ const (
TransferTaskType
// ImportTaskType 导入任务
ImportTaskType
// RecycleTaskType 回收任务
RecycleTaskType
)
// 任务状态
@ -87,12 +89,12 @@ func Resume(p Pool) {
if len(tasks) == 0 {
return
}
util.Log().Info("从数据库中恢复 %d 个未完成任务", len(tasks))
util.Log().Info("Resume %d unfinished task(s) from database.", len(tasks))
for i := 0; i < len(tasks); i++ {
job, err := GetJobFromModel(&tasks[i])
if err != nil {
util.Log().Warning("无法恢复任务,%s", err)
util.Log().Warning("Failed to resume task: %s", err)
continue
}
@ -113,6 +115,8 @@ func GetJobFromModel(task *model.Task) (Job, error) {
return NewTransferTaskFromModel(task)
case ImportTaskType:
return NewImportTaskFromModel(task)
case RecycleTaskType:
return NewRecycleTaskFromModel(task)
default:
return nil, ErrUnknownTaskType
}

@ -2,12 +2,12 @@ package task
import (
"errors"
testMock "github.com/stretchr/testify/mock"
"testing"
"github.com/DATA-DOG/go-sqlmock"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/stretchr/testify/assert"
testMock "github.com/stretchr/testify/mock"
)
func TestRecord(t *testing.T) {
@ -103,4 +103,16 @@ func TestGetJobFromModel(t *testing.T) {
asserts.Nil(job)
asserts.Error(err)
}
// RecycleTaskType
{
task := &model.Task{
Status: 0,
Type: RecycleTaskType,
}
mock.ExpectQuery("SELECT(.+)users(.+)").WillReturnError(errors.New("error"))
job, err := GetJobFromModel(task)
asserts.NoError(mock.ExpectationsWereMet())
asserts.Nil(job)
asserts.Error(err)
}
}

@ -44,11 +44,11 @@ func (pool *AsyncPool) freeWorker() {
// Submit 开始提交任务
func (pool *AsyncPool) Submit(job Job) {
go func() {
util.Log().Debug("等待获取Worker")
util.Log().Debug("Waiting for Worker.")
worker := pool.obtainWorker()
util.Log().Debug("获取到Worker")
util.Log().Debug("Worker obtained.")
worker.Do(job)
util.Log().Debug("释放Worker")
util.Log().Debug("Worker released.")
pool.freeWorker()
}()
}
@ -60,7 +60,7 @@ func Init() {
idleWorker: make(chan int, maxWorker),
}
TaskPoll.Add(maxWorker)
util.Log().Info("初始化任务队列,WorkerNum = %d", maxWorker)
util.Log().Info("Initialize task queue with WorkerNum = %d", maxWorker)
if conf.SystemConfig.Mode == "master" {
Resume(TaskPoll)

@ -0,0 +1,130 @@
package task
import (
"encoding/json"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
)
// RecycleTask 文件回收任务
type RecycleTask struct {
User *model.User
TaskModel *model.Task
TaskProps RecycleProps
Err *JobError
}
// RecycleProps 回收任务属性
type RecycleProps struct {
// 下载任务 GID
DownloadGID string `json:"download_gid"`
}
// Props 获取任务属性
func (job *RecycleTask) Props() string {
res, _ := json.Marshal(job.TaskProps)
return string(res)
}
// Type 获取任务状态
func (job *RecycleTask) Type() int {
return RecycleTaskType
}
// Creator 获取创建者ID
func (job *RecycleTask) Creator() uint {
return job.User.ID
}
// Model 获取任务的数据库模型
func (job *RecycleTask) Model() *model.Task {
return job.TaskModel
}
// SetStatus 设定状态
func (job *RecycleTask) SetStatus(status int) {
job.TaskModel.SetStatus(status)
}
// SetError 设定任务失败信息
func (job *RecycleTask) SetError(err *JobError) {
job.Err = err
res, _ := json.Marshal(job.Err)
job.TaskModel.SetError(string(res))
}
// SetErrorMsg 设定任务失败信息
func (job *RecycleTask) SetErrorMsg(msg string, err error) {
jobErr := &JobError{Msg: msg}
if err != nil {
jobErr.Error = err.Error()
}
job.SetError(jobErr)
}
// GetError 返回任务失败信息
func (job *RecycleTask) GetError() *JobError {
return job.Err
}
// Do 开始执行任务
func (job *RecycleTask) Do() {
download, err := model.GetDownloadByGid(job.TaskProps.DownloadGID, job.User.ID)
if err != nil {
util.Log().Warning("Recycle task %d cannot found download record.", job.TaskModel.ID)
job.SetErrorMsg("Cannot found download task.", err)
return
}
nodeID := download.GetNodeID()
node := cluster.Default.GetNodeByID(nodeID)
if node == nil {
util.Log().Warning("Recycle task %d cannot found node.", job.TaskModel.ID)
job.SetErrorMsg("Invalid slave node.", nil)
return
}
err = node.GetAria2Instance().DeleteTempFile(download)
if err != nil {
util.Log().Warning("Failed to delete transfer temp folder %q: %s", download.Parent, err)
job.SetErrorMsg("Failed to recycle files.", err)
return
}
}
// NewRecycleTask 新建回收任务
func NewRecycleTask(download *model.Download) (Job, error) {
newTask := &RecycleTask{
User: download.GetOwner(),
TaskProps: RecycleProps{
DownloadGID: download.GID,
},
}
record, err := Record(newTask)
if err != nil {
return nil, err
}
newTask.TaskModel = record
return newTask, nil
}
// NewRecycleTaskFromModel 从数据库记录中恢复回收任务
func NewRecycleTaskFromModel(task *model.Task) (Job, error) {
user, err := model.GetActiveUserByID(task.UserID)
if err != nil {
return nil, err
}
newTask := &RecycleTask{
User: &user,
TaskModel: task,
}
err = json.Unmarshal([]byte(task.Props), &newTask.TaskProps)
if err != nil {
return nil, err
}
return newTask, nil
}

@ -0,0 +1,117 @@
package task
import (
"errors"
"testing"
"github.com/DATA-DOG/go-sqlmock"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/jinzhu/gorm"
"github.com/stretchr/testify/assert"
)
func TestRecycleTask_Props(t *testing.T) {
asserts := assert.New(t)
task := &RecycleTask{
User: &model.User{},
}
asserts.NotEmpty(task.Props())
asserts.Equal(RecycleTaskType, task.Type())
asserts.EqualValues(0, task.Creator())
asserts.Nil(task.Model())
}
func TestRecycleTask_SetStatus(t *testing.T) {
asserts := assert.New(t)
task := &RecycleTask{
User: &model.User{},
TaskModel: &model.Task{
Model: gorm.Model{ID: 1},
},
}
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
task.SetStatus(3)
asserts.NoError(mock.ExpectationsWereMet())
}
func TestRecycleTask_SetError(t *testing.T) {
asserts := assert.New(t)
task := &RecycleTask{
User: &model.User{},
TaskModel: &model.Task{
Model: gorm.Model{ID: 1},
},
}
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
task.SetErrorMsg("error", nil)
asserts.NoError(mock.ExpectationsWereMet())
asserts.Equal("error", task.GetError().Msg)
}
func TestNewRecycleTask(t *testing.T) {
asserts := assert.New(t)
// 成功
{
mock.ExpectQuery("SELECT(.+)").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(1))
mock.ExpectBegin()
mock.ExpectExec("INSERT(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
job, err := NewRecycleTask(&model.Download{
Model: gorm.Model{ID: 1},
GID: "test_g_id",
Parent: "/",
UserID: 1,
NodeID: 1,
})
asserts.NoError(mock.ExpectationsWereMet())
asserts.NotNil(job)
asserts.NoError(err)
}
// 失败
{
mock.ExpectQuery("SELECT(.+)").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(1))
mock.ExpectBegin()
mock.ExpectExec("INSERT(.+)").WillReturnError(errors.New("error"))
mock.ExpectRollback()
job, err := NewRecycleTask(&model.Download{
Model: gorm.Model{ID: 1},
GID: "test_g_id",
Parent: "test/not_exist",
UserID: 1,
NodeID: 1,
})
asserts.NoError(mock.ExpectationsWereMet())
asserts.Nil(job)
asserts.Error(err)
}
}
func TestNewRecycleTaskFromModel(t *testing.T) {
asserts := assert.New(t)
// 成功
{
mock.ExpectQuery("SELECT(.+)").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(1))
job, err := NewRecycleTaskFromModel(&model.Task{Props: "{}"})
asserts.NoError(mock.ExpectationsWereMet())
asserts.NoError(err)
asserts.NotNil(job)
}
// JSON解析失败
{
mock.ExpectQuery("SELECT(.+)").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(1))
job, err := NewRecycleTaskFromModel(&model.Task{Props: "?"})
asserts.NoError(mock.ExpectationsWereMet())
asserts.Error(err)
asserts.Nil(job)
}
}

@ -2,6 +2,8 @@ package slavetask
import (
"context"
"os"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem"
@ -10,7 +12,6 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/task"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
"os"
)
// TransferTask 文件中转任务
@ -68,7 +69,7 @@ func (job *TransferTask) SetErrorMsg(msg string, err error) {
}
if err := cluster.DefaultController.SendNotification(job.MasterID, job.Req.Hash(job.MasterID), notifyMsg); err != nil {
util.Log().Warning("无法发送转存失败通知到从机, %s", err)
util.Log().Warning("Failed to send transfer failure notification to master node: %s", err)
}
}
@ -79,30 +80,28 @@ func (job *TransferTask) GetError() *task.JobError {
// Do 开始执行任务
func (job *TransferTask) Do() {
defer job.Recycle()
fs, err := filesystem.NewAnonymousFileSystem()
if err != nil {
job.SetErrorMsg("无法初始化匿名文件系统", err)
job.SetErrorMsg("Failed to initialize anonymous filesystem.", err)
return
}
fs.Policy = job.Req.Policy
if err := fs.DispatchHandler(); err != nil {
job.SetErrorMsg("无法分发存储策略", err)
job.SetErrorMsg("Failed to dispatch policy.", err)
return
}
master, err := cluster.DefaultController.GetMasterInfo(job.MasterID)
if err != nil {
job.SetErrorMsg("找不到主机节点", err)
job.SetErrorMsg("Cannot found master node ID.", err)
return
}
fs.SwitchToShadowHandler(master.Instance, master.URL.String(), master.ID)
file, err := os.Open(util.RelativePath(job.Req.Src))
if err != nil {
job.SetErrorMsg("无法读取源文件", err)
job.SetErrorMsg("Failed to read source file.", err)
return
}
@ -111,7 +110,7 @@ func (job *TransferTask) Do() {
// 获取源文件大小
fi, err := file.Stat()
if err != nil {
job.SetErrorMsg("无法获取源文件大小", err)
job.SetErrorMsg("Failed to get source file size.", err)
return
}
@ -123,7 +122,7 @@ func (job *TransferTask) Do() {
Size: uint64(size),
})
if err != nil {
job.SetErrorMsg("文件上传失败", err)
job.SetErrorMsg("Upload failed.", err)
return
}
@ -134,14 +133,6 @@ func (job *TransferTask) Do() {
}
if err := cluster.DefaultController.SendNotification(job.MasterID, job.Req.Hash(job.MasterID), msg); err != nil {
util.Log().Warning("无法发送转存成功通知到从机, %s", err)
}
}
// Recycle 回收临时文件
func (job *TransferTask) Recycle() {
err := os.Remove(job.Req.Src)
if err != nil {
util.Log().Warning("无法删除中转临时文件[%s], %s", job.Req.Src, err)
util.Log().Warning("Failed to send transfer success notification to master node: %s", err)
}
}

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save