You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
452 lines
10 KiB
452 lines
10 KiB
package cluster
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
model "github.com/cloudreve/Cloudreve/v3/models"
|
|
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
|
|
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
|
|
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
|
|
"github.com/cloudreve/Cloudreve/v3/pkg/conf"
|
|
"github.com/cloudreve/Cloudreve/v3/pkg/request"
|
|
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
|
|
"github.com/cloudreve/Cloudreve/v3/pkg/util"
|
|
"io"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type SlaveNode struct {
|
|
Model *model.Node
|
|
Active bool
|
|
|
|
caller slaveCaller
|
|
callback func(bool, uint)
|
|
close chan bool
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
type slaveCaller struct {
|
|
parent *SlaveNode
|
|
Client request.Client
|
|
}
|
|
|
|
// Init 初始化节点
|
|
func (node *SlaveNode) Init(nodeModel *model.Node) {
|
|
node.lock.Lock()
|
|
node.Model = nodeModel
|
|
|
|
// Init http request client
|
|
var endpoint *url.URL
|
|
if serverURL, err := url.Parse(node.Model.Server); err == nil {
|
|
var controller *url.URL
|
|
controller, _ = url.Parse("/api/v3/slave/")
|
|
endpoint = serverURL.ResolveReference(controller)
|
|
}
|
|
|
|
signTTL := model.GetIntSetting("slave_api_timeout", 60)
|
|
node.caller.Client = request.NewClient(
|
|
request.WithMasterMeta(),
|
|
request.WithTimeout(time.Duration(signTTL)*time.Second),
|
|
request.WithCredential(auth.HMACAuth{SecretKey: []byte(nodeModel.SlaveKey)}, int64(signTTL)),
|
|
request.WithEndpoint(endpoint.String()),
|
|
)
|
|
|
|
node.caller.parent = node
|
|
if node.close != nil {
|
|
node.lock.Unlock()
|
|
node.close <- true
|
|
go node.StartPingLoop()
|
|
} else {
|
|
node.Active = true
|
|
node.lock.Unlock()
|
|
go node.StartPingLoop()
|
|
}
|
|
}
|
|
|
|
// IsFeatureEnabled 查询节点的某项功能是否启用
|
|
func (node *SlaveNode) IsFeatureEnabled(feature string) bool {
|
|
node.lock.RLock()
|
|
defer node.lock.RUnlock()
|
|
|
|
switch feature {
|
|
case "aria2":
|
|
return node.Model.Aria2Enabled
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// SubscribeStatusChange 订阅节点状态更改
|
|
func (node *SlaveNode) SubscribeStatusChange(callback func(bool, uint)) {
|
|
node.lock.Lock()
|
|
node.callback = callback
|
|
node.lock.Unlock()
|
|
}
|
|
|
|
// Ping 从机节点,返回从机负载
|
|
func (node *SlaveNode) Ping(req *serializer.NodePingReq) (*serializer.NodePingResp, error) {
|
|
node.lock.RLock()
|
|
defer node.lock.RUnlock()
|
|
|
|
reqBodyEncoded, err := json.Marshal(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
bodyReader := strings.NewReader(string(reqBodyEncoded))
|
|
|
|
resp, err := node.caller.Client.Request(
|
|
"POST",
|
|
"heartbeat",
|
|
bodyReader,
|
|
).CheckHTTPResponse(200).DecodeResponse()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// 处理列取结果
|
|
if resp.Code != 0 {
|
|
return nil, serializer.NewErrorFromResponse(resp)
|
|
}
|
|
|
|
var res serializer.NodePingResp
|
|
|
|
if resStr, ok := resp.Data.(string); ok {
|
|
err = json.Unmarshal([]byte(resStr), &res)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return &res, nil
|
|
}
|
|
|
|
// IsActive 返回节点是否在线
|
|
func (node *SlaveNode) IsActive() bool {
|
|
node.lock.RLock()
|
|
defer node.lock.RUnlock()
|
|
|
|
return node.Active
|
|
}
|
|
|
|
// Kill 结束节点内相关循环
|
|
func (node *SlaveNode) Kill() {
|
|
node.lock.RLock()
|
|
defer node.lock.RUnlock()
|
|
|
|
if node.close != nil {
|
|
close(node.close)
|
|
}
|
|
}
|
|
|
|
// GetAria2Instance 获取从机Aria2实例
|
|
func (node *SlaveNode) GetAria2Instance() common.Aria2 {
|
|
node.lock.RLock()
|
|
defer node.lock.RUnlock()
|
|
|
|
if !node.Model.Aria2Enabled {
|
|
return &common.DummyAria2{}
|
|
}
|
|
|
|
return &node.caller
|
|
}
|
|
|
|
func (node *SlaveNode) ID() uint {
|
|
node.lock.RLock()
|
|
defer node.lock.RUnlock()
|
|
|
|
return node.Model.ID
|
|
}
|
|
|
|
func (node *SlaveNode) StartPingLoop() {
|
|
node.lock.Lock()
|
|
node.close = make(chan bool)
|
|
node.lock.Unlock()
|
|
|
|
tickDuration := time.Duration(model.GetIntSetting("slave_ping_interval", 300)) * time.Second
|
|
recoverDuration := time.Duration(model.GetIntSetting("slave_recover_interval", 600)) * time.Second
|
|
pingTicker := time.Duration(0)
|
|
|
|
util.Log().Debug("Slave node %q heartbeat loop started.", node.Model.Name)
|
|
retry := 0
|
|
recoverMode := false
|
|
isFirstLoop := true
|
|
|
|
loop:
|
|
for {
|
|
select {
|
|
case <-time.After(pingTicker):
|
|
if pingTicker == 0 {
|
|
pingTicker = tickDuration
|
|
}
|
|
|
|
util.Log().Debug("Slave node %q send ping.", node.Model.Name)
|
|
res, err := node.Ping(node.getHeartbeatContent(isFirstLoop))
|
|
isFirstLoop = false
|
|
|
|
if err != nil {
|
|
util.Log().Debug("Error while ping slave node %q: %s", node.Model.Name, err)
|
|
retry++
|
|
if retry >= model.GetIntSetting("slave_node_retry", 3) {
|
|
util.Log().Debug("Retry threshold for pinging slave node %q exceeded, mark it as offline.", node.Model.Name)
|
|
node.changeStatus(false)
|
|
|
|
if !recoverMode {
|
|
// 启动恢复监控循环
|
|
util.Log().Debug("Slave node %q entered recovery mode.", node.Model.Name)
|
|
pingTicker = recoverDuration
|
|
recoverMode = true
|
|
}
|
|
}
|
|
} else {
|
|
if recoverMode {
|
|
util.Log().Debug("Slave node %q recovered.", node.Model.Name)
|
|
pingTicker = tickDuration
|
|
recoverMode = false
|
|
isFirstLoop = true
|
|
}
|
|
|
|
util.Log().Debug("Status of slave node %q: %s", node.Model.Name, res)
|
|
node.changeStatus(true)
|
|
retry = 0
|
|
}
|
|
|
|
case <-node.close:
|
|
util.Log().Debug("Slave node %q received shutdown signal.", node.Model.Name)
|
|
break loop
|
|
}
|
|
}
|
|
}
|
|
|
|
func (node *SlaveNode) IsMater() bool {
|
|
return false
|
|
}
|
|
|
|
func (node *SlaveNode) MasterAuthInstance() auth.Auth {
|
|
node.lock.RLock()
|
|
defer node.lock.RUnlock()
|
|
|
|
return auth.HMACAuth{SecretKey: []byte(node.Model.MasterKey)}
|
|
}
|
|
|
|
func (node *SlaveNode) SlaveAuthInstance() auth.Auth {
|
|
node.lock.RLock()
|
|
defer node.lock.RUnlock()
|
|
|
|
return auth.HMACAuth{SecretKey: []byte(node.Model.SlaveKey)}
|
|
}
|
|
|
|
func (node *SlaveNode) DBModel() *model.Node {
|
|
node.lock.RLock()
|
|
defer node.lock.RUnlock()
|
|
|
|
return node.Model
|
|
}
|
|
|
|
// getHeartbeatContent gets serializer.NodePingReq used to send heartbeat to slave
|
|
func (node *SlaveNode) getHeartbeatContent(isUpdate bool) *serializer.NodePingReq {
|
|
return &serializer.NodePingReq{
|
|
SiteURL: model.GetSiteURL().String(),
|
|
IsUpdate: isUpdate,
|
|
SiteID: model.GetSettingByName("siteID"),
|
|
Node: node.Model,
|
|
CredentialTTL: model.GetIntSetting("slave_api_timeout", 60),
|
|
}
|
|
}
|
|
|
|
func (node *SlaveNode) changeStatus(isActive bool) {
|
|
node.lock.RLock()
|
|
id := node.Model.ID
|
|
if isActive != node.Active {
|
|
node.lock.RUnlock()
|
|
node.lock.Lock()
|
|
node.Active = isActive
|
|
node.lock.Unlock()
|
|
node.callback(isActive, id)
|
|
} else {
|
|
node.lock.RUnlock()
|
|
}
|
|
|
|
}
|
|
|
|
func (s *slaveCaller) Init() error {
|
|
return nil
|
|
}
|
|
|
|
// SendAria2Call send remote aria2 call to slave node
|
|
func (s *slaveCaller) SendAria2Call(body *serializer.SlaveAria2Call, scope string) (*serializer.Response, error) {
|
|
reqReader, err := getAria2RequestBody(body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return s.Client.Request(
|
|
"POST",
|
|
"aria2/"+scope,
|
|
reqReader,
|
|
).CheckHTTPResponse(200).DecodeResponse()
|
|
}
|
|
|
|
func (s *slaveCaller) CreateTask(task *model.Download, options map[string]interface{}) (string, error) {
|
|
s.parent.lock.RLock()
|
|
defer s.parent.lock.RUnlock()
|
|
|
|
req := &serializer.SlaveAria2Call{
|
|
Task: task,
|
|
GroupOptions: options,
|
|
}
|
|
|
|
res, err := s.SendAria2Call(req, "task")
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if res.Code != 0 {
|
|
return "", serializer.NewErrorFromResponse(res)
|
|
}
|
|
|
|
return res.Data.(string), err
|
|
}
|
|
|
|
func (s *slaveCaller) Status(task *model.Download) (rpc.StatusInfo, error) {
|
|
s.parent.lock.RLock()
|
|
defer s.parent.lock.RUnlock()
|
|
|
|
req := &serializer.SlaveAria2Call{
|
|
Task: task,
|
|
}
|
|
|
|
res, err := s.SendAria2Call(req, "status")
|
|
if err != nil {
|
|
return rpc.StatusInfo{}, err
|
|
}
|
|
|
|
if res.Code != 0 {
|
|
return rpc.StatusInfo{}, serializer.NewErrorFromResponse(res)
|
|
}
|
|
|
|
var status rpc.StatusInfo
|
|
res.GobDecode(&status)
|
|
|
|
return status, err
|
|
}
|
|
|
|
func (s *slaveCaller) Cancel(task *model.Download) error {
|
|
s.parent.lock.RLock()
|
|
defer s.parent.lock.RUnlock()
|
|
|
|
req := &serializer.SlaveAria2Call{
|
|
Task: task,
|
|
}
|
|
|
|
res, err := s.SendAria2Call(req, "cancel")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if res.Code != 0 {
|
|
return serializer.NewErrorFromResponse(res)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *slaveCaller) Select(task *model.Download, files []int) error {
|
|
s.parent.lock.RLock()
|
|
defer s.parent.lock.RUnlock()
|
|
|
|
req := &serializer.SlaveAria2Call{
|
|
Task: task,
|
|
Files: files,
|
|
}
|
|
|
|
res, err := s.SendAria2Call(req, "select")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if res.Code != 0 {
|
|
return serializer.NewErrorFromResponse(res)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *slaveCaller) GetConfig() model.Aria2Option {
|
|
s.parent.lock.RLock()
|
|
defer s.parent.lock.RUnlock()
|
|
|
|
return s.parent.Model.Aria2OptionsSerialized
|
|
}
|
|
|
|
func (s *slaveCaller) DeleteTempFile(task *model.Download) error {
|
|
s.parent.lock.RLock()
|
|
defer s.parent.lock.RUnlock()
|
|
|
|
req := &serializer.SlaveAria2Call{
|
|
Task: task,
|
|
}
|
|
|
|
res, err := s.SendAria2Call(req, "delete")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if res.Code != 0 {
|
|
return serializer.NewErrorFromResponse(res)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func getAria2RequestBody(body *serializer.SlaveAria2Call) (io.Reader, error) {
|
|
reqBodyEncoded, err := json.Marshal(body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return strings.NewReader(string(reqBodyEncoded)), nil
|
|
}
|
|
|
|
// RemoteCallback 发送远程存储策略上传回调请求
|
|
func RemoteCallback(url string, body serializer.UploadCallback) error {
|
|
callbackBody, err := json.Marshal(struct {
|
|
Data serializer.UploadCallback `json:"data"`
|
|
}{
|
|
Data: body,
|
|
})
|
|
if err != nil {
|
|
return serializer.NewError(serializer.CodeCallbackError, "Failed to encode callback content", err)
|
|
}
|
|
|
|
resp := request.GeneralClient.Request(
|
|
"POST",
|
|
url,
|
|
bytes.NewReader(callbackBody),
|
|
request.WithTimeout(time.Duration(conf.SlaveConfig.CallbackTimeout)*time.Second),
|
|
request.WithCredential(auth.General, int64(conf.SlaveConfig.SignatureTTL)),
|
|
)
|
|
|
|
if resp.Err != nil {
|
|
return serializer.NewError(serializer.CodeCallbackError, "Slave cannot send callback request", resp.Err)
|
|
}
|
|
|
|
// 解析回调服务端响应
|
|
response, err := resp.DecodeResponse()
|
|
if err != nil {
|
|
msg := fmt.Sprintf("Slave cannot parse callback response from master (StatusCode=%d).", resp.Response.StatusCode)
|
|
return serializer.NewError(serializer.CodeCallbackError, msg, err)
|
|
}
|
|
|
|
if response.Code != 0 {
|
|
return serializer.NewError(response.Code, response.Msg, errors.New(response.Error))
|
|
}
|
|
|
|
return nil
|
|
}
|