Feat: slave transfer file in slave policy

pull/1040/head
HFO4 4 years ago
parent a7448ed0ea
commit f73abd021b

@ -78,7 +78,7 @@ func getSignContent(r *http.Request) (rawSignString string) {
// 决定要签名的header // 决定要签名的header
var signedHeader []string var signedHeader []string
for k, _ := range r.Header { for k, _ := range r.Header {
if strings.HasPrefix(k, "X-") { if strings.HasPrefix(k, "X-") && k != "X-Filename" {
signedHeader = append(signedHeader, fmt.Sprintf("%s=%s", k, r.Header.Get(k))) signedHeader = append(signedHeader, fmt.Sprintf("%s=%s", k, r.Header.Get(k)))
} }
} }

@ -170,7 +170,6 @@ func (handler Driver) Put(ctx context.Context, file io.ReadCloser, dst string, s
handler.Policy.GetUploadURL(), handler.Policy.GetUploadURL(),
file, file,
request.WithHeader(map[string][]string{ request.WithHeader(map[string][]string{
"Authorization": {credential.Token},
"X-Policy": {credential.Policy}, "X-Policy": {credential.Policy},
"X-FileName": {fileName}, "X-FileName": {fileName},
"X-Overwrite": {overwrite}, "X-Overwrite": {overwrite},
@ -178,6 +177,7 @@ func (handler Driver) Put(ctx context.Context, file io.ReadCloser, dst string, s
request.WithContentLength(int64(size)), request.WithContentLength(int64(size)),
request.WithTimeout(time.Duration(0)), request.WithTimeout(time.Duration(0)),
request.WithMasterMeta(), request.WithMasterMeta(),
request.WithCredential(handler.AuthInstance, int64(credentialTTL)),
).CheckHTTPResponse(200).DecodeResponse() ).CheckHTTPResponse(200).DecodeResponse()
if err != nil { if err != nil {
return err return err

@ -3,6 +3,7 @@ package masterinslave
import ( import (
"context" "context"
model "github.com/cloudreve/Cloudreve/v3/models" model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer" "github.com/cloudreve/Cloudreve/v3/pkg/serializer"
@ -12,15 +13,15 @@ import (
// Driver 影子存储策略,用于在从机端上传文件 // Driver 影子存储策略,用于在从机端上传文件
type Driver struct { type Driver struct {
masterID string master cluster.Node
handler driver.Handler handler driver.Handler
policy *model.Policy policy *model.Policy
} }
// NewDriver 返回新的处理器 // NewDriver 返回新的处理器
func NewDriver(masterID string, handler driver.Handler, policy *model.Policy) driver.Handler { func NewDriver(master cluster.Node, handler driver.Handler, policy *model.Policy) driver.Handler {
return &Driver{ return &Driver{
masterID: masterID, master: master,
handler: handler, handler: handler,
policy: policy, policy: policy,
} }

@ -97,7 +97,7 @@ func (d *Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size u
} }
func (d *Driver) Delete(ctx context.Context, files []string) ([]string, error) { func (d *Driver) Delete(ctx context.Context, files []string) ([]string, error) {
return nil, ErrNotImplemented return d.handler.Delete(ctx, files)
} }
func (d *Driver) Get(ctx context.Context, path string) (response.RSCloser, error) { func (d *Driver) Get(ctx context.Context, path string) (response.RSCloser, error) {

@ -248,8 +248,19 @@ func (fs *FileSystem) SwitchToSlaveHandler(node cluster.Node) {
} }
// SwitchToShadowHandler 将负责上传的 Handler 切换为从机节点转存使用的影子处理器 // SwitchToShadowHandler 将负责上传的 Handler 切换为从机节点转存使用的影子处理器
func (fs *FileSystem) SwitchToShadowHandler(masterID string) { func (fs *FileSystem) SwitchToShadowHandler(master cluster.Node, masterURL string) {
fs.Handler = masterinslave.NewDriver(masterID, fs.Handler, fs.Policy) // 交换主从存储策略
if fs.Policy.Type == "remote" {
fs.Policy.Type = "local"
fs.DispatchHandler()
} else if fs.Policy.Type == "local" {
fs.Policy.Type = "remote"
fs.Policy.Server = masterURL
fs.Policy.SecretKey = master.DBModel().MasterKey
fs.DispatchHandler()
}
fs.Handler = masterinslave.NewDriver(master, fs.Handler, fs.Policy)
} }
// SetTargetFile 设置当前处理的目标文件 // SetTargetFile 设置当前处理的目标文件

@ -19,7 +19,7 @@ import (
) )
// GeneralClient 通用 HTTP Client // GeneralClient 通用 HTTP Client
var GeneralClient Client = HTTPClient{} var GeneralClient Client = NewClient()
// Response 请求的响应或错误信息 // Response 请求的响应或错误信息
type Response struct { type Response struct {

@ -11,6 +11,7 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/serializer" "github.com/cloudreve/Cloudreve/v3/pkg/serializer"
) )
// TODO: move to slave pkg
// RemoteCallback 发送远程存储策略上传回调请求 // RemoteCallback 发送远程存储策略上传回调请求
func RemoteCallback(url string, body serializer.UploadCallback) error { func RemoteCallback(url string, body serializer.UploadCallback) error {
callbackBody, err := json.Marshal(struct { callbackBody, err := json.Marshal(struct {

@ -24,7 +24,7 @@ type Controller interface {
// Handle heartbeat sent from master // Handle heartbeat sent from master
HandleHeartBeat(*serializer.NodePingReq) (serializer.NodePingResp, error) HandleHeartBeat(*serializer.NodePingReq) (serializer.NodePingResp, error)
// Get Aria2 instance by master node id // Get Aria2 Instance by master node ID
GetAria2Instance(string) (common.Aria2, error) GetAria2Instance(string) (common.Aria2, error)
// Send event change message to master node // Send event change message to master node
@ -32,28 +32,32 @@ type Controller interface {
// Submit async task into task pool // Submit async task into task pool
SubmitTask(string, task.Job, string) error SubmitTask(string, task.Job, string) error
// Get master node info
GetMasterInfo(string) (*MasterInfo, error)
} }
type slaveController struct { type slaveController struct {
masters map[string]masterInfo masters map[string]MasterInfo
client request.Client client request.Client
lock sync.RWMutex lock sync.RWMutex
} }
// info of master node // info of master node
type masterInfo struct { type MasterInfo struct {
slaveID uint SlaveID uint
id string ID string
ttl int TTL int
url *url.URL URL *url.URL
jobTracker map[string]bool
// used to invoke aria2 rpc calls // used to invoke aria2 rpc calls
instance cluster.Node Instance cluster.Node
jobTracker map[string]bool
} }
func Init() { func Init() {
DefaultController = &slaveController{ DefaultController = &slaveController{
masters: make(map[string]masterInfo), masters: make(map[string]MasterInfo),
client: request.NewClient(), client: request.NewClient(),
} }
gob.Register(rpc.StatusInfo{}) gob.Register(rpc.StatusInfo{})
@ -70,7 +74,7 @@ func (c *slaveController) HandleHeartBeat(req *serializer.NodePingReq) (serializ
if (ok && req.IsUpdate) || !ok { if (ok && req.IsUpdate) || !ok {
if ok { if ok {
origin.instance.Kill() origin.Instance.Kill()
} }
masterUrl, err := url.Parse(req.SiteURL) masterUrl, err := url.Parse(req.SiteURL)
@ -78,13 +82,13 @@ func (c *slaveController) HandleHeartBeat(req *serializer.NodePingReq) (serializ
return serializer.NodePingResp{}, err return serializer.NodePingResp{}, err
} }
c.masters[req.SiteID] = masterInfo{ c.masters[req.SiteID] = MasterInfo{
slaveID: req.Node.ID, SlaveID: req.Node.ID,
id: req.SiteID, ID: req.SiteID,
url: masterUrl, URL: masterUrl,
ttl: req.CredentialTTL, TTL: req.CredentialTTL,
jobTracker: make(map[string]bool), jobTracker: make(map[string]bool),
instance: cluster.NewNodeFromDBModel(&model.Node{ Instance: cluster.NewNodeFromDBModel(&model.Node{
MasterKey: req.Node.MasterKey, MasterKey: req.Node.MasterKey,
Type: model.MasterNodeType, Type: model.MasterNodeType,
Aria2Enabled: req.Node.Aria2Enabled, Aria2Enabled: req.Node.Aria2Enabled,
@ -101,7 +105,7 @@ func (c *slaveController) GetAria2Instance(id string) (common.Aria2, error) {
defer c.lock.RUnlock() defer c.lock.RUnlock()
if node, ok := c.masters[id]; ok { if node, ok := c.masters[id]; ok {
return node.instance.GetAria2Instance(), nil return node.Instance.GetAria2Instance(), nil
} }
return nil, ErrMasterNotFound return nil, ErrMasterNotFound
@ -122,10 +126,10 @@ func (c *slaveController) SendNotification(id, subject string, msg mq.Message) e
res, err := c.client.Request( res, err := c.client.Request(
"PUT", "PUT",
node.url.ResolveReference(apiPath).String(), node.URL.ResolveReference(apiPath).String(),
&body, &body,
request.WithHeader(http.Header{"X-Node-Id": []string{fmt.Sprintf("%d", node.slaveID)}}), request.WithHeader(http.Header{"X-Node-Id": []string{fmt.Sprintf("%d", node.SlaveID)}}),
request.WithCredential(node.instance.MasterAuthInstance(), int64(node.ttl)), request.WithCredential(node.Instance.MasterAuthInstance(), int64(node.TTL)),
).CheckHTTPResponse(200).DecodeResponse() ).CheckHTTPResponse(200).DecodeResponse()
if err != nil { if err != nil {
return err return err
@ -159,3 +163,15 @@ func (c *slaveController) SubmitTask(id string, job task.Job, hash string) error
return ErrMasterNotFound return ErrMasterNotFound
} }
// GetMasterInfo 获取主机节点信息
func (c *slaveController) GetMasterInfo(id string) (*MasterInfo, error) {
c.lock.RLock()
defer c.lock.RUnlock()
if node, ok := c.masters[id]; ok {
return &node, nil
}
return nil, ErrMasterNotFound
}

@ -91,7 +91,13 @@ func (job *TransferTask) Do() {
return return
} }
fs.SwitchToShadowHandler(job.MasterID) master, err := slave.DefaultController.GetMasterInfo(job.MasterID)
if err != nil {
job.SetErrorMsg("找不到主机节点", err)
return
}
fs.SwitchToShadowHandler(master.Instance, master.URL.String())
ctx := context.WithValue(context.Background(), fsctx.DisableOverwrite, true) ctx := context.WithValue(context.Background(), fsctx.DisableOverwrite, true)
file, err := os.Open(util.RelativePath(job.Req.Src)) file, err := os.Open(util.RelativePath(job.Req.Src))
if err != nil { if err != nil {

@ -204,6 +204,8 @@ func InitMasterRouter() *gin.Engine {
slave.Use(middleware.SlaveRPCSignRequired()) slave.Use(middleware.SlaveRPCSignRequired())
{ {
slave.PUT("notification/:subject", controllers.SlaveNotificationPush) slave.PUT("notification/:subject", controllers.SlaveNotificationPush)
// 上传
slave.POST("upload", controllers.SlaveUpload)
} }
// 回调接口 // 回调接口

Loading…
Cancel
Save