From f79508095d46c0c50f36d915a6fec0b128451604 Mon Sep 17 00:00:00 2001 From: HFO4 <912394456@qq.com> Date: Sat, 21 Aug 2021 11:04:04 +0800 Subject: [PATCH] Feat: salve receive and handle heartbeat --- bootstrap/init.go | 5 +++ models/node.go | 5 +-- pkg/aria2/aria2.go | 7 +++++ pkg/cluster/master.go | 7 +++++ pkg/cluster/node.go | 5 ++- pkg/cluster/pool.go | 2 +- pkg/cluster/slave.go | 58 +++++++++++++++++++++++------------ pkg/serializer/slave.go | 5 +++ pkg/slave/slave.go | 68 +++++++++++++++++++++++++++++++++++++++++ service/node/fabric.go | 12 ++++++-- 10 files changed, 149 insertions(+), 25 deletions(-) create mode 100644 pkg/slave/slave.go diff --git a/bootstrap/init.go b/bootstrap/init.go index 775ecd12..729a5777 100644 --- a/bootstrap/init.go +++ b/bootstrap/init.go @@ -9,6 +9,7 @@ import ( "github.com/cloudreve/Cloudreve/v3/pkg/conf" "github.com/cloudreve/Cloudreve/v3/pkg/crontab" "github.com/cloudreve/Cloudreve/v3/pkg/email" + "github.com/cloudreve/Cloudreve/v3/pkg/slave" "github.com/cloudreve/Cloudreve/v3/pkg/task" "github.com/gin-gonic/gin" ) @@ -21,6 +22,7 @@ func Init(path string) { if !conf.SystemConfig.Debug { gin.SetMode(gin.ReleaseMode) } + cache.Init() if conf.SystemConfig.Mode == "master" { model.Init() @@ -30,6 +32,9 @@ func Init(path string) { email.Init() crontab.Init() InitStatic() + } else { + slave.Init() } + auth.Init() } diff --git a/models/node.go b/models/node.go index fde6d3ea..bf831252 100644 --- a/models/node.go +++ b/models/node.go @@ -12,12 +12,13 @@ type Node struct { Name string // 节点别名 Type ModelType // 节点状态 Server string // 服务器地址 - SecretKey string `gorm:"type:text"` // 通信密钥 + SlaveKey string `gorm:"type:text"` // 主->从 通信密钥 + MasterKey string `gorm:"type:text"` // 从->主 通信密钥 Aria2Enabled bool // 是否支持用作离线下载节点 Aria2Options string `gorm:"type:text"` // 离线下载配置 // 数据库忽略字段 - Aria2OptionsSerialized Aria2Option `gorm:"-"` + Aria2OptionsSerialized Aria2Option `gorm:"-" json:"-"` } // Aria2Option 非公有的Aria2配置属性 diff --git a/pkg/aria2/aria2.go b/pkg/aria2/aria2.go index fe0cc721..b86ecd99 100644 --- a/pkg/aria2/aria2.go +++ b/pkg/aria2/aria2.go @@ -18,6 +18,13 @@ var LB balancer.Balancer // Lock Instance的读写锁 var Lock sync.RWMutex +// GetLoadBalancer 返回供Aria2使用的负载均衡器 +func GetLoadBalancer() balancer.Balancer { + Lock.RLock() + defer Lock.RUnlock() + return LB +} + // Init 初始化 func Init(isReload bool) { Lock.Lock() diff --git a/pkg/cluster/master.go b/pkg/cluster/master.go index b0074c62..9389af20 100644 --- a/pkg/cluster/master.go +++ b/pkg/cluster/master.go @@ -84,6 +84,13 @@ func (node *MasterNode) IsActive() bool { return true } +// Kill 结束aria2请求 +func (node *MasterNode) Kill() { + if node.aria2RPC.Caller != nil { + node.aria2RPC.Caller.Close() + } +} + // GetAria2Instance 获取主机Aria2实例 func (node *MasterNode) GetAria2Instance() common.Aria2 { if !node.Model.Aria2Enabled { diff --git a/pkg/cluster/node.go b/pkg/cluster/node.go index 021d133a..6620ab79 100644 --- a/pkg/cluster/node.go +++ b/pkg/cluster/node.go @@ -21,9 +21,12 @@ type Node interface { GetAria2Instance() common.Aria2 // Returns unique id of this node ID() uint + // Kill node and recycle resources + Kill() } -func getNodeFromDBModel(node *model.Node) Node { +// Create new node from DB model +func NewNodeFromDBModel(node *model.Node) Node { switch node.Type { case model.SlaveNodeType: slave := &SlaveNode{} diff --git a/pkg/cluster/pool.go b/pkg/cluster/pool.go index 3caf3c2c..bc965dc7 100644 --- a/pkg/cluster/pool.go +++ b/pkg/cluster/pool.go @@ -95,7 +95,7 @@ func (pool *NodePool) initFromDB() error { pool.active = make(map[uint]Node) pool.inactive = make(map[uint]Node) for i := 0; i < len(nodes); i++ { - newNode := getNodeFromDBModel(&nodes[i]) + newNode := NewNodeFromDBModel(&nodes[i]) if newNode.IsActive() { pool.active[nodes[i].ID] = newNode } else { diff --git a/pkg/cluster/slave.go b/pkg/cluster/slave.go index 343ebc6d..8cdba660 100644 --- a/pkg/cluster/slave.go +++ b/pkg/cluster/slave.go @@ -31,7 +31,7 @@ type SlaveNode struct { func (node *SlaveNode) Init(nodeModel *model.Node) { node.lock.Lock() node.Model = nodeModel - node.AuthInstance = auth.HMACAuth{SecretKey: []byte(nodeModel.SecretKey)} + node.AuthInstance = auth.HMACAuth{SecretKey: []byte(nodeModel.SlaveKey)} node.Client = request.HTTPClient{} node.Active = true if node.close != nil { @@ -99,6 +99,28 @@ func (node *SlaveNode) IsActive() bool { return node.Active } +// Kill 结束节点内相关循环 +func (node *SlaveNode) Kill() { + node.lock.RLock() + defer node.lock.RUnlock() + + if node.close != nil { + close(node.close) + } +} + +// GetAria2Instance 获取从机Aria2实例 +func (node *SlaveNode) GetAria2Instance() common.Aria2 { + return nil +} + +func (node *SlaveNode) ID() uint { + node.lock.RLock() + defer node.lock.RUnlock() + + return node.Model.ID +} + // getAPIUrl 获取接口请求地址 func (node *SlaveNode) getAPIUrl(scope string) string { node.lock.RLock() @@ -136,8 +158,7 @@ func (node *SlaveNode) StartPingLoop() { tickDuration := time.Duration(model.GetIntSetting("slave_ping_interval", 300)) * time.Second recoverDuration := time.Duration(model.GetIntSetting("slave_recover_interval", 600)) * time.Second - pingTicker := time.NewTicker(tickDuration) - defer pingTicker.Stop() + pingTicker := time.Duration(0) util.Log().Debug("从机节点 [%s] 启动心跳循环", node.Model.Name) retry := 0 @@ -146,9 +167,13 @@ func (node *SlaveNode) StartPingLoop() { loop: for { select { - case <-pingTicker.C: + case <-time.After(pingTicker): + if pingTicker == 0 { + pingTicker = tickDuration + } + util.Log().Debug("从机节点 [%s] 发送Ping", node.Model.Name) - res, err := node.Ping(&serializer.NodePingReq{}) + res, err := node.Ping(node.getHeartbeatContent(false)) if err != nil { util.Log().Debug("Ping从机节点 [%s] 时发生错误: %s", node.Model.Name, err) retry++ @@ -159,16 +184,14 @@ loop: if !recoverMode { // 启动恢复监控循环 util.Log().Debug("从机节点 [%s] 进入恢复模式", node.Model.Name) - pingTicker.Stop() - pingTicker = time.NewTicker(recoverDuration) + pingTicker = recoverDuration recoverMode = true } } } else { if recoverMode { util.Log().Debug("从机节点 [%s] 复活", node.Model.Name) - pingTicker.Stop() - pingTicker = time.NewTicker(tickDuration) + pingTicker = tickDuration recoverMode = false } @@ -184,14 +207,11 @@ loop: } } -// GetAria2Instance 获取从机Aria2实例 -func (node *SlaveNode) GetAria2Instance() common.Aria2 { - return nil -} - -func (node *SlaveNode) ID() uint { - node.lock.RLock() - defer node.lock.RUnlock() - - return node.Model.ID +// getHeartbeatContent gets serializer.NodePingReq used to send heartbeat to slave +func (node *SlaveNode) getHeartbeatContent(isUpdate bool) *serializer.NodePingReq { + return &serializer.NodePingReq{ + IsUpdate: isUpdate, + MasterURL: model.GetSiteURL().String(), + Node: node.Model, + } } diff --git a/pkg/serializer/slave.go b/pkg/serializer/slave.go index f0ee1877..5d19adbf 100644 --- a/pkg/serializer/slave.go +++ b/pkg/serializer/slave.go @@ -1,5 +1,7 @@ package serializer +import model "github.com/cloudreve/Cloudreve/v3/models" + // RemoteDeleteRequest 远程策略删除接口请求正文 type RemoteDeleteRequest struct { Files []string `json:"files"` @@ -13,6 +15,9 @@ type ListRequest struct { // NodePingReq 从机节点Ping请求 type NodePingReq struct { + MasterURL string `json:"master_url"` + IsUpdate bool `json:"is_update"` + Node *model.Node `json:"node"` } // NodePingResp 从机节点Ping响应 diff --git a/pkg/slave/slave.go b/pkg/slave/slave.go new file mode 100644 index 00000000..af900aed --- /dev/null +++ b/pkg/slave/slave.go @@ -0,0 +1,68 @@ +package slave + +import ( + model "github.com/cloudreve/Cloudreve/v3/models" + "github.com/cloudreve/Cloudreve/v3/pkg/auth" + "github.com/cloudreve/Cloudreve/v3/pkg/cluster" + "github.com/cloudreve/Cloudreve/v3/pkg/serializer" + "sync" +) + +var DefaultController Controller + +// Controller controls communications between master and slave +type Controller interface { + // Handle heartbeat sent from master + HandleHeartBeat(*serializer.NodePingReq) (serializer.NodePingResp, error) +} + +type slaveController struct { + masters map[string]masterInfo + lock sync.RWMutex +} + +// info of master node +type masterInfo struct { + slaveID uint + url string + authClient auth.Auth + // used to invoke aria2 rpc calls + instance cluster.Node +} + +func Init() { + DefaultController = &slaveController{ + masters: make(map[string]masterInfo), + } +} + +func (c *slaveController) HandleHeartBeat(req *serializer.NodePingReq) (serializer.NodePingResp, error) { + c.lock.Lock() + defer c.lock.Unlock() + + req.Node.AfterFind() + + // close old node if exist + origin, ok := c.masters[req.MasterURL] + + if (ok && req.IsUpdate) || !ok { + if ok { + origin.instance.Kill() + } + + c.masters[req.MasterURL] = masterInfo{ + slaveID: req.Node.ID, + url: req.MasterURL, + authClient: auth.HMACAuth{ + SecretKey: []byte(req.Node.MasterKey), + }, + instance: cluster.NewNodeFromDBModel(&model.Node{ + Type: model.MasterNodeType, + Aria2Enabled: req.Node.Aria2Enabled, + Aria2OptionsSerialized: req.Node.Aria2OptionsSerialized, + }), + } + } + + return serializer.NodePingResp{}, nil +} diff --git a/service/node/fabric.go b/service/node/fabric.go index 78dfc307..959e0464 100644 --- a/service/node/fabric.go +++ b/service/node/fabric.go @@ -1,10 +1,18 @@ package node -import "github.com/cloudreve/Cloudreve/v3/pkg/serializer" +import ( + "github.com/cloudreve/Cloudreve/v3/pkg/serializer" + "github.com/cloudreve/Cloudreve/v3/pkg/slave" +) func HandleMasterHeartbeat(req *serializer.NodePingReq) serializer.Response { + res, err := slave.DefaultController.HandleHeartBeat(req) + if err != nil { + return serializer.Err(serializer.CodeInternalSetting, "无法初始化从机控制器", err) + } + return serializer.Response{ Code: 0, - Data: serializer.NodePingResp{}, + Data: res, } }