Feat: slave transfer file in OneDrive policy

pull/1040/head
HFO4 4 years ago
parent 5699c8a0f2
commit adaa3290dd

@ -37,6 +37,7 @@ type Policy struct {
// 数据库忽略字段 // 数据库忽略字段
OptionsSerialized PolicyOption `gorm:"-"` OptionsSerialized PolicyOption `gorm:"-"`
MasterID string `gorm:"-"`
} }
// PolicyOption 非公有的存储策略属性 // PolicyOption 非公有的存储策略属性
@ -277,6 +278,13 @@ func (policy *Policy) SaveAndClearCache() error {
return err return err
} }
// SaveAndClearCache 更新并清理缓存
func (policy *Policy) UpdateAccessKeyAndClearCache(s string) error {
err := DB.Model(policy).UpdateColumn("access_key", s).Error
policy.ClearCache()
return err
}
// ClearCache 清空policy缓存 // ClearCache 清空policy缓存
func (policy *Policy) ClearCache() { func (policy *Policy) ClearCache() {
cache.Deletes([]string{strconv.FormatUint(uint64(policy.ID), 10)}, "policy_") cache.Deletes([]string{strconv.FormatUint(uint64(policy.ID), 10)}, "policy_")

@ -14,6 +14,7 @@ import (
model "github.com/cloudreve/Cloudreve/v3/models" model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/auth" "github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/cache" "github.com/cloudreve/Cloudreve/v3/pkg/cache"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response"
"github.com/cloudreve/Cloudreve/v3/pkg/request" "github.com/cloudreve/Cloudreve/v3/pkg/request"
@ -27,6 +28,16 @@ type Driver struct {
HTTPClient request.Client HTTPClient request.Client
} }
// NewDriver 从存储策略初始化新的Driver实例
func NewDriver(policy *model.Policy) (driver.Handler, error) {
client, err := NewClient(policy)
return Driver{
Policy: policy,
Client: client,
HTTPClient: request.NewClient(),
}, err
}
// List 列取项目 // List 列取项目
func (handler Driver) List(ctx context.Context, base string, recursive bool) ([]response.Object, error) { func (handler Driver) List(ctx context.Context, base string, recursive bool) ([]response.Object, error) {
base = strings.TrimPrefix(base, "/") base = strings.TrimPrefix(base, "/")

@ -0,0 +1,25 @@
package onedrive
import "sync"
// CredentialLock 针对存储策略凭证的锁
type CredentialLock interface {
Lock(uint)
Unlock(uint)
}
var GlobalMutex = mutexMap{}
type mutexMap struct {
locks sync.Map
}
func (m *mutexMap) Lock(id uint) {
lock, _ := m.locks.LoadOrStore(id, &sync.Mutex{})
lock.(*sync.Mutex).Lock()
}
func (m *mutexMap) Unlock(id uint) {
lock, _ := m.locks.LoadOrStore(id, &sync.Mutex{})
lock.(*sync.Mutex).Unlock()
}

@ -10,7 +10,9 @@ import (
"time" "time"
"github.com/cloudreve/Cloudreve/v3/pkg/cache" "github.com/cloudreve/Cloudreve/v3/pkg/cache"
"github.com/cloudreve/Cloudreve/v3/pkg/conf"
"github.com/cloudreve/Cloudreve/v3/pkg/request" "github.com/cloudreve/Cloudreve/v3/pkg/request"
"github.com/cloudreve/Cloudreve/v3/pkg/slave"
"github.com/cloudreve/Cloudreve/v3/pkg/util" "github.com/cloudreve/Cloudreve/v3/pkg/util"
) )
@ -124,6 +126,13 @@ func (client *Client) ObtainToken(ctx context.Context, opts ...Option) (*Credent
// UpdateCredential 更新凭证,并检查有效期 // UpdateCredential 更新凭证,并检查有效期
func (client *Client) UpdateCredential(ctx context.Context) error { func (client *Client) UpdateCredential(ctx context.Context) error {
if conf.SystemConfig.Mode == "slave" {
return client.fetchCredentialFromMaster(ctx)
}
GlobalMutex.Lock(client.Policy.ID)
defer GlobalMutex.Unlock(client.Policy.ID)
// 如果已存在凭证 // 如果已存在凭证
if client.Credential != nil && client.Credential.AccessToken != "" { if client.Credential != nil && client.Credential.AccessToken != "" {
// 检查已有凭证是否过期 // 检查已有凭证是否过期
@ -160,11 +169,21 @@ func (client *Client) UpdateCredential(ctx context.Context) error {
client.Credential = credential client.Credential = credential
// 更新存储策略的 RefreshToken // 更新存储策略的 RefreshToken
client.Policy.AccessKey = credential.RefreshToken client.Policy.UpdateAccessKeyAndClearCache(credential.RefreshToken)
client.Policy.SaveAndClearCache()
// 更新缓存 // 更新缓存
cache.Set("onedrive_"+client.ClientID, *credential, int(expires)) cache.Set("onedrive_"+client.ClientID, *credential, int(expires))
return nil return nil
} }
// UpdateCredential 更新凭证,并检查有效期
func (client *Client) fetchCredentialFromMaster(ctx context.Context) error {
res, err := slave.DefaultController.GetOneDriveToken(client.Policy.MasterID, client.Policy.ID)
if err != nil {
return err
}
client.Credential = &Credential{AccessToken: res}
return nil
}

@ -176,13 +176,9 @@ func (fs *FileSystem) DispatchHandler() error {
} }
return nil return nil
case "onedrive": case "onedrive":
client, err := onedrive.NewClient(currentPolicy) var odErr error
fs.Handler = onedrive.Driver{ fs.Handler, odErr = onedrive.NewDriver(currentPolicy)
Policy: currentPolicy, return odErr
Client: client,
HTTPClient: request.NewClient(),
}
return err
case "cos": case "cos":
u, _ := url.Parse(currentPolicy.Server) u, _ := url.Parse(currentPolicy.Server)
b := &cossdk.BaseURL{BucketURL: u} b := &cossdk.BaseURL{BucketURL: u}
@ -249,17 +245,19 @@ func (fs *FileSystem) SwitchToSlaveHandler(node cluster.Node) {
} }
// SwitchToShadowHandler 将负责上传的 Handler 切换为从机节点转存使用的影子处理器 // SwitchToShadowHandler 将负责上传的 Handler 切换为从机节点转存使用的影子处理器
func (fs *FileSystem) SwitchToShadowHandler(master cluster.Node, masterURL string) { func (fs *FileSystem) SwitchToShadowHandler(master cluster.Node, masterURL, masterID string) {
// 交换主从存储策略 switch fs.Policy.Type {
if fs.Policy.Type == "remote" { case "remote":
fs.Policy.Type = "local" fs.Policy.Type = "local"
fs.DispatchHandler() fs.DispatchHandler()
} else if fs.Policy.Type == "local" { case "local":
fs.Policy.Type = "remote" fs.Policy.Type = "remote"
fs.Policy.Server = masterURL fs.Policy.Server = masterURL
fs.Policy.AccessKey = fmt.Sprintf("%d", master.ID()) fs.Policy.AccessKey = fmt.Sprintf("%d", master.ID())
fs.Policy.SecretKey = master.DBModel().MasterKey fs.Policy.SecretKey = master.DBModel().MasterKey
fs.DispatchHandler() fs.DispatchHandler()
case "onedrive":
fs.Policy.MasterID = masterID
} }
fs.Handler = masterinslave.NewDriver(master, fs.Handler, fs.Policy) fs.Handler = masterinslave.NewDriver(master, fs.Handler, fs.Policy)

@ -112,7 +112,14 @@ func (c HTTPClient) Request(method, target string, body io.Reader, opts ...Optio
// 签名请求 // 签名请求
if options.sign != nil { if options.sign != nil {
auth.SignRequest(options.sign, req, options.signTTL) switch method {
case "PUT", "POST", "PATCH":
auth.SignRequest(options.sign, req, options.signTTL)
default:
if resURL, err := auth.SignURI(options.sign, req.URL.String(), options.signTTL); err == nil {
req.URL = resURL
}
}
} }
// 发送请求 // 发送请求

@ -7,11 +7,11 @@ import (
model "github.com/cloudreve/Cloudreve/v3/models" model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common" "github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc" "github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster" "github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/mq" "github.com/cloudreve/Cloudreve/v3/pkg/mq"
"github.com/cloudreve/Cloudreve/v3/pkg/request" "github.com/cloudreve/Cloudreve/v3/pkg/request"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer" "github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/task"
"github.com/jinzhu/gorm" "github.com/jinzhu/gorm"
"net/url" "net/url"
"sync" "sync"
@ -31,15 +31,17 @@ type Controller interface {
SendNotification(string, string, mq.Message) error SendNotification(string, string, mq.Message) error
// Submit async task into task pool // Submit async task into task pool
SubmitTask(string, task.Job, string) error SubmitTask(string, interface{}, string, func(interface{})) error
// Get master node info // Get master node info
GetMasterInfo(string) (*MasterInfo, error) GetMasterInfo(string) (*MasterInfo, error)
// Get master OneDrive policy credential
GetOneDriveToken(string, uint) (string, error)
} }
type slaveController struct { type slaveController struct {
masters map[string]MasterInfo masters map[string]MasterInfo
client request.Client
lock sync.RWMutex lock sync.RWMutex
} }
@ -50,6 +52,7 @@ type MasterInfo struct {
URL *url.URL URL *url.URL
// used to invoke aria2 rpc calls // used to invoke aria2 rpc calls
Instance cluster.Node Instance cluster.Node
Client request.Client
jobTracker map[string]bool jobTracker map[string]bool
} }
@ -57,7 +60,6 @@ type MasterInfo struct {
func Init() { func Init() {
DefaultController = &slaveController{ DefaultController = &slaveController{
masters: make(map[string]MasterInfo), masters: make(map[string]MasterInfo),
client: request.NewClient(),
} }
gob.Register(rpc.StatusInfo{}) gob.Register(rpc.StatusInfo{})
} }
@ -82,9 +84,16 @@ func (c *slaveController) HandleHeartBeat(req *serializer.NodePingReq) (serializ
} }
c.masters[req.SiteID] = MasterInfo{ c.masters[req.SiteID] = MasterInfo{
ID: req.SiteID, ID: req.SiteID,
URL: masterUrl, URL: masterUrl,
TTL: req.CredentialTTL, TTL: req.CredentialTTL,
Client: request.NewClient(
request.WithEndpoint(masterUrl.String()),
request.WithSlaveMeta(fmt.Sprintf("%d", req.Node.ID)),
request.WithCredential(auth.HMACAuth{
SecretKey: []byte(req.Node.MasterKey),
}, int64(req.CredentialTTL)),
),
jobTracker: make(map[string]bool), jobTracker: make(map[string]bool),
Instance: cluster.NewNodeFromDBModel(&model.Node{ Instance: cluster.NewNodeFromDBModel(&model.Node{
Model: gorm.Model{ID: req.Node.ID}, Model: gorm.Model{ID: req.Node.ID},
@ -116,19 +125,16 @@ func (c *slaveController) SendNotification(id, subject string, msg mq.Message) e
if node, ok := c.masters[id]; ok { if node, ok := c.masters[id]; ok {
c.lock.RUnlock() c.lock.RUnlock()
apiPath, _ := url.Parse(fmt.Sprintf("/api/v3/slave/notification/%s", subject))
body := bytes.Buffer{} body := bytes.Buffer{}
enc := gob.NewEncoder(&body) enc := gob.NewEncoder(&body)
if err := enc.Encode(&msg); err != nil { if err := enc.Encode(&msg); err != nil {
return err return err
} }
res, err := c.client.Request( res, err := node.Client.Request(
"PUT", "PUT",
node.URL.ResolveReference(apiPath).String(), fmt.Sprintf("/api/v3/slave/notification/%s", subject),
&body, &body,
request.WithSlaveMeta(fmt.Sprintf("%d", node.Instance.ID())),
request.WithCredential(node.Instance.MasterAuthInstance(), int64(node.TTL)),
).CheckHTTPResponse(200).DecodeResponse() ).CheckHTTPResponse(200).DecodeResponse()
if err != nil { if err != nil {
return err return err
@ -146,7 +152,7 @@ func (c *slaveController) SendNotification(id, subject string, msg mq.Message) e
} }
// SubmitTask 提交异步任务 // SubmitTask 提交异步任务
func (c *slaveController) SubmitTask(id string, job task.Job, hash string) error { func (c *slaveController) SubmitTask(id string, job interface{}, hash string, submitter func(interface{})) error {
c.lock.RLock() c.lock.RLock()
defer c.lock.RUnlock() defer c.lock.RUnlock()
@ -156,7 +162,7 @@ func (c *slaveController) SubmitTask(id string, job task.Job, hash string) error
return nil return nil
} }
task.TaskPoll.Submit(job) submitter(job)
return nil return nil
} }
@ -174,3 +180,30 @@ func (c *slaveController) GetMasterInfo(id string) (*MasterInfo, error) {
return nil, ErrMasterNotFound return nil, ErrMasterNotFound
} }
// GetOneDriveToken 获取主机OneDrive凭证
func (c *slaveController) GetOneDriveToken(id string, policyID uint) (string, error) {
c.lock.RLock()
if node, ok := c.masters[id]; ok {
c.lock.RUnlock()
res, err := node.Client.Request(
"GET",
fmt.Sprintf("/api/v3/slave/credential/onedrive/%d", policyID),
nil,
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return "", err
}
if res.Code != 0 {
return "", serializer.NewErrorFromResponse(res)
}
return res.Data.(string), nil
}
c.lock.RUnlock()
return "", ErrMasterNotFound
}

@ -97,7 +97,7 @@ func (job *TransferTask) Do() {
return return
} }
fs.SwitchToShadowHandler(master.Instance, master.URL.String()) fs.SwitchToShadowHandler(master.Instance, master.URL.String(), master.ID)
ctx := context.WithValue(context.Background(), fsctx.DisableOverwrite, true) ctx := context.WithValue(context.Background(), fsctx.DisableOverwrite, true)
file, err := os.Open(util.RelativePath(job.Req.Src)) file, err := os.Open(util.RelativePath(job.Req.Src))
if err != nil { if err != nil {

@ -254,3 +254,14 @@ func SlaveNotificationPush(c *gin.Context) {
c.JSON(200, ErrorResponse(err)) c.JSON(200, ErrorResponse(err))
} }
} }
// SlaveGetOneDriveCredential 从机获取主机的OneDrive存储策略凭证
func SlaveGetOneDriveCredential(c *gin.Context) {
var service node.OneDriveCredentialService
if err := c.ShouldBindUri(&service); err == nil {
res := service.Get(c)
c.JSON(200, res)
} else {
c.JSON(200, ErrorResponse(err))
}
}

@ -203,9 +203,12 @@ func InitMasterRouter() *gin.Engine {
slave := v3.Group("slave") slave := v3.Group("slave")
slave.Use(middleware.SlaveRPCSignRequired()) slave.Use(middleware.SlaveRPCSignRequired())
{ {
// 事件通知
slave.PUT("notification/:subject", controllers.SlaveNotificationPush) slave.PUT("notification/:subject", controllers.SlaveNotificationPush)
// 上传 // 上传
slave.POST("upload", controllers.SlaveUpload) slave.POST("upload", controllers.SlaveUpload)
// OneDrive 存储策略凭证
slave.GET("credential/onedrive/:id", controllers.SlaveGetOneDriveCredential)
} }
// 回调接口 // 回调接口

@ -10,6 +10,7 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer" "github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/slave" "github.com/cloudreve/Cloudreve/v3/pkg/slave"
"github.com/cloudreve/Cloudreve/v3/pkg/task"
"github.com/cloudreve/Cloudreve/v3/pkg/task/slavetask" "github.com/cloudreve/Cloudreve/v3/pkg/task/slavetask"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/jinzhu/gorm" "github.com/jinzhu/gorm"
@ -152,7 +153,9 @@ func CreateTransferTask(c *gin.Context, req *serializer.SlaveTransferReq) serial
MasterID: id.(string), MasterID: id.(string),
} }
if err := slave.DefaultController.SubmitTask(job.MasterID, job, req.Hash(job.MasterID)); err != nil { if err := slave.DefaultController.SubmitTask(job.MasterID, job, req.Hash(job.MasterID), func(job interface{}) {
task.TaskPoll.Submit(job.(task.Job))
}); err != nil {
return serializer.Err(serializer.CodeInternalSetting, "任务创建失败", err) return serializer.Err(serializer.CodeInternalSetting, "任务创建失败", err)
} }

@ -2,6 +2,8 @@ package node
import ( import (
"encoding/gob" "encoding/gob"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/onedrive"
"github.com/cloudreve/Cloudreve/v3/pkg/mq" "github.com/cloudreve/Cloudreve/v3/pkg/mq"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer" "github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/slave" "github.com/cloudreve/Cloudreve/v3/pkg/slave"
@ -12,10 +14,14 @@ type SlaveNotificationService struct {
Subject string `uri:"subject" binding:"required"` Subject string `uri:"subject" binding:"required"`
} }
type OneDriveCredentialService struct {
PolicyID uint `uri:"id" binding:"required"`
}
func HandleMasterHeartbeat(req *serializer.NodePingReq) serializer.Response { func HandleMasterHeartbeat(req *serializer.NodePingReq) serializer.Response {
res, err := slave.DefaultController.HandleHeartBeat(req) res, err := slave.DefaultController.HandleHeartBeat(req)
if err != nil { if err != nil {
return serializer.Err(serializer.CodeInternalSetting, "无法初始化从机控制器", err) return serializer.Err(serializer.CodeInternalSetting, "Cannot initialize slave controller", err)
} }
return serializer.Response{ return serializer.Response{
@ -29,9 +35,28 @@ func (s *SlaveNotificationService) HandleSlaveNotificationPush(c *gin.Context) s
var msg mq.Message var msg mq.Message
dec := gob.NewDecoder(c.Request.Body) dec := gob.NewDecoder(c.Request.Body)
if err := dec.Decode(&msg); err != nil { if err := dec.Decode(&msg); err != nil {
return serializer.ParamErr("无法解析通知消息", err) return serializer.ParamErr("Cannot parse notification message", err)
} }
mq.GlobalMQ.Publish(s.Subject, msg) mq.GlobalMQ.Publish(s.Subject, msg)
return serializer.Response{} return serializer.Response{}
} }
// Get 获取主机OneDrive策略的AccessToken
func (s *OneDriveCredentialService) Get(c *gin.Context) serializer.Response {
policy, err := model.GetPolicyByID(s.PolicyID)
if err != nil {
return serializer.Err(serializer.CodeNotFound, "Cannot found storage policy", err)
}
client, err := onedrive.NewClient(&policy)
if err != nil {
return serializer.Err(serializer.CodeInternalSetting, "Cannot initialize OneDrive client", err)
}
if err := client.UpdateCredential(c); err != nil {
return serializer.Err(serializer.CodeInternalSetting, "Cannot refresh OneDrive credential", err)
}
return serializer.Response{Data: client.Credential.AccessToken}
}

Loading…
Cancel
Save