Feat: inactive nodes recover loop

pull/1040/head
HFO4 4 years ago
parent edbc3db7f4
commit 154bf36a33

@ -103,6 +103,7 @@ func addDefaultSettings() {
{Name: "slave_api_timeout", Value: `60`, Type: "timeout"}, {Name: "slave_api_timeout", Value: `60`, Type: "timeout"},
{Name: "slave_node_retry", Value: `3`, Type: "slave"}, {Name: "slave_node_retry", Value: `3`, Type: "slave"},
{Name: "slave_ping_interval", Value: `300`, Type: "slave"}, {Name: "slave_ping_interval", Value: `300`, Type: "slave"},
{Name: "slave_recover_interval", Value: `600`, Type: "slave"},
{Name: "onedrive_monitor_timeout", Value: `600`, Type: "timeout"}, {Name: "onedrive_monitor_timeout", Value: `600`, Type: "timeout"},
{Name: "share_download_session_timeout", Value: `2073600`, Type: "timeout"}, {Name: "share_download_session_timeout", Value: `2073600`, Type: "timeout"},
{Name: "onedrive_callback_check", Value: `20`, Type: "timeout"}, {Name: "onedrive_callback_check", Value: `20`, Type: "timeout"},

@ -21,6 +21,7 @@ func getNodeFromDBModel(node *model.Node) Node {
Model: node, Model: node,
AuthInstance: auth.HMACAuth{SecretKey: []byte(node.SecretKey)}, AuthInstance: auth.HMACAuth{SecretKey: []byte(node.SecretKey)},
Client: request.HTTPClient{}, Client: request.HTTPClient{},
Active: true,
} }
go slave.StartPingLoop() go slave.StartPingLoop()
return slave return slave

@ -57,7 +57,7 @@ func (pool *NodePool) buildIndexMap() {
} }
func (pool *NodePool) nodeStatusChange(isActive bool, id uint) { func (pool *NodePool) nodeStatusChange(isActive bool, id uint) {
util.Log().Debug("从机节点 [ID=%d] 状态变更 [active=%t]", id, isActive) util.Log().Debug("从机节点 [ID=%d] 状态变更 [Active=%t]", id, isActive)
pool.lock.Lock() pool.lock.Lock()
if isActive { if isActive {
node := pool.inactive[id] node := pool.inactive[id]

@ -19,8 +19,8 @@ type SlaveNode struct {
Model *model.Node Model *model.Node
AuthInstance auth.Auth AuthInstance auth.Auth
Client request.Client Client request.Client
Active bool
active bool
callback func(bool, uint) callback func(bool, uint)
close chan bool close chan bool
lock sync.RWMutex lock sync.RWMutex
@ -80,7 +80,7 @@ func (node *SlaveNode) Ping(req *serializer.NodePingReq) (*serializer.NodePingRe
// IsActive 返回节点是否在线 // IsActive 返回节点是否在线
func (node *SlaveNode) IsActive() bool { func (node *SlaveNode) IsActive() bool {
return node.active return node.Active
} }
// getAPIUrl 获取接口请求地址 // getAPIUrl 获取接口请求地址
@ -101,10 +101,10 @@ func (node *SlaveNode) getAPIUrl(scope string) string {
func (node *SlaveNode) changeStatus(isActive bool) { func (node *SlaveNode) changeStatus(isActive bool) {
node.lock.RLock() node.lock.RLock()
id := node.Model.ID id := node.Model.ID
if isActive != node.active { if isActive != node.Active {
node.lock.RUnlock() node.lock.RUnlock()
node.lock.Lock() node.lock.Lock()
node.active = isActive node.Active = isActive
node.lock.Unlock() node.lock.Unlock()
node.callback(isActive, id) node.callback(isActive, id)
} else { } else {
@ -116,34 +116,51 @@ func (node *SlaveNode) changeStatus(isActive bool) {
func (node *SlaveNode) StartPingLoop() { func (node *SlaveNode) StartPingLoop() {
node.lock.Lock() node.lock.Lock()
node.close = make(chan bool) node.close = make(chan bool)
node.active = true
node.lock.Unlock() node.lock.Unlock()
t := time.NewTicker(time.Duration(model.GetIntSetting("slave_ping_interval", 300)) * time.Second) tickDuration := time.Duration(model.GetIntSetting("slave_ping_interval", 300)) * time.Second
defer t.Stop() recoverDuration := time.Duration(model.GetIntSetting("slave_recover_interval", 600)) * time.Second
pingTicker := time.NewTicker(tickDuration)
defer pingTicker.Stop()
util.Log().Debug("从机节点 [%s] 启动心跳循环", node.Model.Name) util.Log().Debug("从机节点 [%s] 启动心跳循环", node.Model.Name)
retry := 0 retry := 0
recoverMode := false
loop: loop:
for { for {
select { select {
case <-t.C: case <-pingTicker.C:
util.Log().Debug("从机节点 [%s] 发送Ping", node.Model.Name) util.Log().Debug("从机节点 [%s] 发送Ping", node.Model.Name)
res, err := node.Ping(&serializer.NodePingReq{}) res, err := node.Ping(&serializer.NodePingReq{})
if err != nil { if err != nil {
util.Log().Debug("Ping从机节点 [%s] 时发生错误: %s", node.Model.Name, err) util.Log().Debug("Ping从机节点 [%s] 时发生错误: %s", node.Model.Name, err)
retry++ retry++
if retry > model.GetIntSetting("slave_node_retry", 3) { if retry >= model.GetIntSetting("slave_node_retry", 3) {
util.Log().Debug("从机节点 [%s] Ping 重试已达到最大限制,将从机节点标记为不可用", node.Model.Name) util.Log().Debug("从机节点 [%s] Ping 重试已达到最大限制,将从机节点标记为不可用", node.Model.Name)
node.changeStatus(false) node.changeStatus(false)
break loop
if !recoverMode {
// 启动恢复监控循环
util.Log().Debug("从机节点 [%s] 进入恢复模式", node.Model.Name)
pingTicker.Stop()
pingTicker = time.NewTicker(recoverDuration)
recoverMode = true
}
} }
break } else {
if recoverMode {
util.Log().Debug("从机节点 [%s] 复活", node.Model.Name)
pingTicker.Stop()
pingTicker = time.NewTicker(tickDuration)
recoverMode = false
} }
util.Log().Debug("从机节点 [%s] 状态: %s", node.Model.Name, res) util.Log().Debug("从机节点 [%s] 状态: %s", node.Model.Name, res)
node.changeStatus(true) node.changeStatus(true)
retry = 0
}
case <-node.close: case <-node.close:
util.Log().Debug("从机节点 [%s] 收到关闭信号", node.Model.Name) util.Log().Debug("从机节点 [%s] 收到关闭信号", node.Model.Name)
break loop break loop

Loading…
Cancel
Save