Feat: salve receive and handle heartbeat

pull/1040/head
HFO4 4 years ago
parent 97aaa35792
commit f79508095d

@ -9,6 +9,7 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/conf" "github.com/cloudreve/Cloudreve/v3/pkg/conf"
"github.com/cloudreve/Cloudreve/v3/pkg/crontab" "github.com/cloudreve/Cloudreve/v3/pkg/crontab"
"github.com/cloudreve/Cloudreve/v3/pkg/email" "github.com/cloudreve/Cloudreve/v3/pkg/email"
"github.com/cloudreve/Cloudreve/v3/pkg/slave"
"github.com/cloudreve/Cloudreve/v3/pkg/task" "github.com/cloudreve/Cloudreve/v3/pkg/task"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
@ -21,6 +22,7 @@ func Init(path string) {
if !conf.SystemConfig.Debug { if !conf.SystemConfig.Debug {
gin.SetMode(gin.ReleaseMode) gin.SetMode(gin.ReleaseMode)
} }
cache.Init() cache.Init()
if conf.SystemConfig.Mode == "master" { if conf.SystemConfig.Mode == "master" {
model.Init() model.Init()
@ -30,6 +32,9 @@ func Init(path string) {
email.Init() email.Init()
crontab.Init() crontab.Init()
InitStatic() InitStatic()
} else {
slave.Init()
} }
auth.Init() auth.Init()
} }

@ -12,12 +12,13 @@ type Node struct {
Name string // 节点别名 Name string // 节点别名
Type ModelType // 节点状态 Type ModelType // 节点状态
Server string // 服务器地址 Server string // 服务器地址
SecretKey string `gorm:"type:text"` // 通信密钥 SlaveKey string `gorm:"type:text"` // 主->从 通信密钥
MasterKey string `gorm:"type:text"` // 从->主 通信密钥
Aria2Enabled bool // 是否支持用作离线下载节点 Aria2Enabled bool // 是否支持用作离线下载节点
Aria2Options string `gorm:"type:text"` // 离线下载配置 Aria2Options string `gorm:"type:text"` // 离线下载配置
// 数据库忽略字段 // 数据库忽略字段
Aria2OptionsSerialized Aria2Option `gorm:"-"` Aria2OptionsSerialized Aria2Option `gorm:"-" json:"-"`
} }
// Aria2Option 非公有的Aria2配置属性 // Aria2Option 非公有的Aria2配置属性

@ -18,6 +18,13 @@ var LB balancer.Balancer
// Lock Instance的读写锁 // Lock Instance的读写锁
var Lock sync.RWMutex var Lock sync.RWMutex
// GetLoadBalancer 返回供Aria2使用的负载均衡器
func GetLoadBalancer() balancer.Balancer {
Lock.RLock()
defer Lock.RUnlock()
return LB
}
// Init 初始化 // Init 初始化
func Init(isReload bool) { func Init(isReload bool) {
Lock.Lock() Lock.Lock()

@ -84,6 +84,13 @@ func (node *MasterNode) IsActive() bool {
return true return true
} }
// Kill 结束aria2请求
func (node *MasterNode) Kill() {
if node.aria2RPC.Caller != nil {
node.aria2RPC.Caller.Close()
}
}
// GetAria2Instance 获取主机Aria2实例 // GetAria2Instance 获取主机Aria2实例
func (node *MasterNode) GetAria2Instance() common.Aria2 { func (node *MasterNode) GetAria2Instance() common.Aria2 {
if !node.Model.Aria2Enabled { if !node.Model.Aria2Enabled {

@ -21,9 +21,12 @@ type Node interface {
GetAria2Instance() common.Aria2 GetAria2Instance() common.Aria2
// Returns unique id of this node // Returns unique id of this node
ID() uint ID() uint
// Kill node and recycle resources
Kill()
} }
func getNodeFromDBModel(node *model.Node) Node { // Create new node from DB model
func NewNodeFromDBModel(node *model.Node) Node {
switch node.Type { switch node.Type {
case model.SlaveNodeType: case model.SlaveNodeType:
slave := &SlaveNode{} slave := &SlaveNode{}

@ -95,7 +95,7 @@ func (pool *NodePool) initFromDB() error {
pool.active = make(map[uint]Node) pool.active = make(map[uint]Node)
pool.inactive = make(map[uint]Node) pool.inactive = make(map[uint]Node)
for i := 0; i < len(nodes); i++ { for i := 0; i < len(nodes); i++ {
newNode := getNodeFromDBModel(&nodes[i]) newNode := NewNodeFromDBModel(&nodes[i])
if newNode.IsActive() { if newNode.IsActive() {
pool.active[nodes[i].ID] = newNode pool.active[nodes[i].ID] = newNode
} else { } else {

@ -31,7 +31,7 @@ type SlaveNode struct {
func (node *SlaveNode) Init(nodeModel *model.Node) { func (node *SlaveNode) Init(nodeModel *model.Node) {
node.lock.Lock() node.lock.Lock()
node.Model = nodeModel node.Model = nodeModel
node.AuthInstance = auth.HMACAuth{SecretKey: []byte(nodeModel.SecretKey)} node.AuthInstance = auth.HMACAuth{SecretKey: []byte(nodeModel.SlaveKey)}
node.Client = request.HTTPClient{} node.Client = request.HTTPClient{}
node.Active = true node.Active = true
if node.close != nil { if node.close != nil {
@ -99,6 +99,28 @@ func (node *SlaveNode) IsActive() bool {
return node.Active return node.Active
} }
// Kill 结束节点内相关循环
func (node *SlaveNode) Kill() {
node.lock.RLock()
defer node.lock.RUnlock()
if node.close != nil {
close(node.close)
}
}
// GetAria2Instance 获取从机Aria2实例
func (node *SlaveNode) GetAria2Instance() common.Aria2 {
return nil
}
func (node *SlaveNode) ID() uint {
node.lock.RLock()
defer node.lock.RUnlock()
return node.Model.ID
}
// getAPIUrl 获取接口请求地址 // getAPIUrl 获取接口请求地址
func (node *SlaveNode) getAPIUrl(scope string) string { func (node *SlaveNode) getAPIUrl(scope string) string {
node.lock.RLock() node.lock.RLock()
@ -136,8 +158,7 @@ func (node *SlaveNode) StartPingLoop() {
tickDuration := time.Duration(model.GetIntSetting("slave_ping_interval", 300)) * time.Second tickDuration := time.Duration(model.GetIntSetting("slave_ping_interval", 300)) * time.Second
recoverDuration := time.Duration(model.GetIntSetting("slave_recover_interval", 600)) * time.Second recoverDuration := time.Duration(model.GetIntSetting("slave_recover_interval", 600)) * time.Second
pingTicker := time.NewTicker(tickDuration) pingTicker := time.Duration(0)
defer pingTicker.Stop()
util.Log().Debug("从机节点 [%s] 启动心跳循环", node.Model.Name) util.Log().Debug("从机节点 [%s] 启动心跳循环", node.Model.Name)
retry := 0 retry := 0
@ -146,9 +167,13 @@ func (node *SlaveNode) StartPingLoop() {
loop: loop:
for { for {
select { select {
case <-pingTicker.C: case <-time.After(pingTicker):
if pingTicker == 0 {
pingTicker = tickDuration
}
util.Log().Debug("从机节点 [%s] 发送Ping", node.Model.Name) util.Log().Debug("从机节点 [%s] 发送Ping", node.Model.Name)
res, err := node.Ping(&serializer.NodePingReq{}) res, err := node.Ping(node.getHeartbeatContent(false))
if err != nil { if err != nil {
util.Log().Debug("Ping从机节点 [%s] 时发生错误: %s", node.Model.Name, err) util.Log().Debug("Ping从机节点 [%s] 时发生错误: %s", node.Model.Name, err)
retry++ retry++
@ -159,16 +184,14 @@ loop:
if !recoverMode { if !recoverMode {
// 启动恢复监控循环 // 启动恢复监控循环
util.Log().Debug("从机节点 [%s] 进入恢复模式", node.Model.Name) util.Log().Debug("从机节点 [%s] 进入恢复模式", node.Model.Name)
pingTicker.Stop() pingTicker = recoverDuration
pingTicker = time.NewTicker(recoverDuration)
recoverMode = true recoverMode = true
} }
} }
} else { } else {
if recoverMode { if recoverMode {
util.Log().Debug("从机节点 [%s] 复活", node.Model.Name) util.Log().Debug("从机节点 [%s] 复活", node.Model.Name)
pingTicker.Stop() pingTicker = tickDuration
pingTicker = time.NewTicker(tickDuration)
recoverMode = false recoverMode = false
} }
@ -184,14 +207,11 @@ loop:
} }
} }
// GetAria2Instance 获取从机Aria2实例 // getHeartbeatContent gets serializer.NodePingReq used to send heartbeat to slave
func (node *SlaveNode) GetAria2Instance() common.Aria2 { func (node *SlaveNode) getHeartbeatContent(isUpdate bool) *serializer.NodePingReq {
return nil return &serializer.NodePingReq{
} IsUpdate: isUpdate,
MasterURL: model.GetSiteURL().String(),
func (node *SlaveNode) ID() uint { Node: node.Model,
node.lock.RLock() }
defer node.lock.RUnlock()
return node.Model.ID
} }

@ -1,5 +1,7 @@
package serializer package serializer
import model "github.com/cloudreve/Cloudreve/v3/models"
// RemoteDeleteRequest 远程策略删除接口请求正文 // RemoteDeleteRequest 远程策略删除接口请求正文
type RemoteDeleteRequest struct { type RemoteDeleteRequest struct {
Files []string `json:"files"` Files []string `json:"files"`
@ -13,6 +15,9 @@ type ListRequest struct {
// NodePingReq 从机节点Ping请求 // NodePingReq 从机节点Ping请求
type NodePingReq struct { type NodePingReq struct {
MasterURL string `json:"master_url"`
IsUpdate bool `json:"is_update"`
Node *model.Node `json:"node"`
} }
// NodePingResp 从机节点Ping响应 // NodePingResp 从机节点Ping响应

@ -0,0 +1,68 @@
package slave
import (
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"sync"
)
var DefaultController Controller
// Controller controls communications between master and slave
type Controller interface {
// Handle heartbeat sent from master
HandleHeartBeat(*serializer.NodePingReq) (serializer.NodePingResp, error)
}
type slaveController struct {
masters map[string]masterInfo
lock sync.RWMutex
}
// info of master node
type masterInfo struct {
slaveID uint
url string
authClient auth.Auth
// used to invoke aria2 rpc calls
instance cluster.Node
}
func Init() {
DefaultController = &slaveController{
masters: make(map[string]masterInfo),
}
}
func (c *slaveController) HandleHeartBeat(req *serializer.NodePingReq) (serializer.NodePingResp, error) {
c.lock.Lock()
defer c.lock.Unlock()
req.Node.AfterFind()
// close old node if exist
origin, ok := c.masters[req.MasterURL]
if (ok && req.IsUpdate) || !ok {
if ok {
origin.instance.Kill()
}
c.masters[req.MasterURL] = masterInfo{
slaveID: req.Node.ID,
url: req.MasterURL,
authClient: auth.HMACAuth{
SecretKey: []byte(req.Node.MasterKey),
},
instance: cluster.NewNodeFromDBModel(&model.Node{
Type: model.MasterNodeType,
Aria2Enabled: req.Node.Aria2Enabled,
Aria2OptionsSerialized: req.Node.Aria2OptionsSerialized,
}),
}
}
return serializer.NodePingResp{}, nil
}

@ -1,10 +1,18 @@
package node package node
import "github.com/cloudreve/Cloudreve/v3/pkg/serializer" import (
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/slave"
)
func HandleMasterHeartbeat(req *serializer.NodePingReq) serializer.Response { func HandleMasterHeartbeat(req *serializer.NodePingReq) serializer.Response {
res, err := slave.DefaultController.HandleHeartBeat(req)
if err != nil {
return serializer.Err(serializer.CodeInternalSetting, "无法初始化从机控制器", err)
}
return serializer.Response{ return serializer.Response{
Code: 0, Code: 0,
Data: serializer.NodePingResp{}, Data: res,
} }
} }

Loading…
Cancel
Save