From edbc3db7f47c685fa738b169763021bbefd7b38a Mon Sep 17 00:00:00 2001 From: HFO4 <912394456@qq.com> Date: Thu, 19 Aug 2021 20:18:23 +0800 Subject: [PATCH] Feat: master send scheduled ping request --- models/migration.go | 2 ++ models/node.go | 1 + pkg/cluster/master.go | 7 ++++- pkg/cluster/node.go | 7 +++-- pkg/cluster/pool.go | 64 ++++++++++++++++++++++++++++++++---------- pkg/cluster/slave.go | 65 ++++++++++++++++++++++++++++++++++++------- 6 files changed, 118 insertions(+), 28 deletions(-) diff --git a/models/migration.go b/models/migration.go index 2a78cb0d..516b93f4 100644 --- a/models/migration.go +++ b/models/migration.go @@ -101,6 +101,8 @@ func addDefaultSettings() { {Name: "upload_credential_timeout", Value: `1800`, Type: "timeout"}, {Name: "upload_session_timeout", Value: `86400`, Type: "timeout"}, {Name: "slave_api_timeout", Value: `60`, Type: "timeout"}, + {Name: "slave_node_retry", Value: `3`, Type: "slave"}, + {Name: "slave_ping_interval", Value: `300`, Type: "slave"}, {Name: "onedrive_monitor_timeout", Value: `600`, Type: "timeout"}, {Name: "share_download_session_timeout", Value: `2073600`, Type: "timeout"}, {Name: "onedrive_callback_check", Value: `20`, Type: "timeout"}, diff --git a/models/node.go b/models/node.go index 8fb67918..739a98e2 100644 --- a/models/node.go +++ b/models/node.go @@ -6,6 +6,7 @@ import "github.com/jinzhu/gorm" type Node struct { gorm.Model Status NodeStatus // 节点状态 + Name string // 节点别名 Type ModelType // 节点状态 Server string // 服务器地址 SecretKey string `gorm:"type:text"` // 通信密钥 diff --git a/pkg/cluster/master.go b/pkg/cluster/master.go index 27478428..a036f46b 100644 --- a/pkg/cluster/master.go +++ b/pkg/cluster/master.go @@ -22,5 +22,10 @@ func (node *MasterNode) IsFeatureEnabled(feature string) bool { } // SubscribeStatusChange 订阅节点状态更改 -func (node *MasterNode) SubscribeStatusChange(callback func()) { +func (node *MasterNode) SubscribeStatusChange(callback func(isActive bool, id uint)) { +} + +// IsActive 返回节点是否在线 +func (node *MasterNode) IsActive() bool { + return true } diff --git a/pkg/cluster/node.go b/pkg/cluster/node.go index 7c08d5d0..40602777 100644 --- a/pkg/cluster/node.go +++ b/pkg/cluster/node.go @@ -9,18 +9,21 @@ import ( type Node interface { IsFeatureEnabled(feature string) bool - SubscribeStatusChange(callback func()) + SubscribeStatusChange(callback func(isActive bool, id uint)) Ping(req *serializer.NodePingReq) (*serializer.NodePingResp, error) + IsActive() bool } func getNodeFromDBModel(node *model.Node) Node { switch node.Type { case model.SlaveNodeType: - return &SlaveNode{ + slave := &SlaveNode{ Model: node, AuthInstance: auth.HMACAuth{SecretKey: []byte(node.SecretKey)}, Client: request.HTTPClient{}, } + go slave.StartPingLoop() + return slave default: return &MasterNode{ Model: node, diff --git a/pkg/cluster/pool.go b/pkg/cluster/pool.go index 35e77ff5..9060dcc4 100644 --- a/pkg/cluster/pool.go +++ b/pkg/cluster/pool.go @@ -18,9 +18,12 @@ type Pool interface { // NodePool 通用节点池 type NodePool struct { - nodes []Node + active map[uint]Node + inactive map[uint]Node + featureMap map[string][]Node - lock sync.RWMutex + + lock sync.RWMutex } func (pool *NodePool) Select() { @@ -37,6 +40,39 @@ func Init() { } } +func (pool *NodePool) buildIndexMap() { + pool.lock.Lock() + for _, feature := range featureGroup { + pool.featureMap[feature] = make([]Node, 0) + } + + for _, v := range pool.active { + for _, feature := range featureGroup { + if v.IsFeatureEnabled(feature) { + pool.featureMap[feature] = append(pool.featureMap[feature], v) + } + } + } + pool.lock.Unlock() +} + +func (pool *NodePool) nodeStatusChange(isActive bool, id uint) { + util.Log().Debug("从机节点 [ID=%d] 状态变更 [active=%t]", id, isActive) + pool.lock.Lock() + if isActive { + node := pool.inactive[id] + delete(pool.inactive, id) + pool.active[id] = node + } else { + node := pool.active[id] + delete(pool.active, id) + pool.inactive[id] = node + } + pool.lock.Unlock() + + pool.buildIndexMap() +} + func (pool *NodePool) initFromDB() error { nodes, err := model.GetNodesByStatus(model.NodeActive) if err != nil { @@ -44,25 +80,23 @@ func (pool *NodePool) initFromDB() error { } pool.lock.Lock() - - for _, feature := range featureGroup { - pool.featureMap[feature] = make([]Node, 0) - } - + pool.active = make(map[uint]Node) + pool.inactive = make(map[uint]Node) for i := 0; i < len(nodes); i++ { newNode := getNodeFromDBModel(&nodes[i]) - pool.nodes = append(pool.nodes, newNode) - - for _, feature := range featureGroup { - if newNode.IsFeatureEnabled(feature) { - pool.featureMap[feature] = append(pool.featureMap[feature], newNode) - } + if newNode.IsActive() { + pool.active[nodes[i].ID] = newNode + } else { + pool.inactive[nodes[i].ID] = newNode } - newNode.SubscribeStatusChange(func() { + // 订阅节点状态变更 + newNode.SubscribeStatusChange(func(isActive bool, id uint) { + pool.nodeStatusChange(isActive, id) }) } - pool.lock.Unlock() + + pool.buildIndexMap() return nil } diff --git a/pkg/cluster/slave.go b/pkg/cluster/slave.go index 64ab2696..65aacce4 100644 --- a/pkg/cluster/slave.go +++ b/pkg/cluster/slave.go @@ -1,13 +1,13 @@ package cluster import ( - "context" "encoding/json" "errors" model "github.com/cloudreve/Cloudreve/v3/models" "github.com/cloudreve/Cloudreve/v3/pkg/auth" "github.com/cloudreve/Cloudreve/v3/pkg/request" "github.com/cloudreve/Cloudreve/v3/pkg/serializer" + "github.com/cloudreve/Cloudreve/v3/pkg/util" "net/url" "path" "strings" @@ -20,8 +20,9 @@ type SlaveNode struct { AuthInstance auth.Auth Client request.Client - callback func() - ctx context.Context + active bool + callback func(bool, uint) + close chan bool lock sync.RWMutex } @@ -34,7 +35,7 @@ func (node *SlaveNode) IsFeatureEnabled(feature string) bool { } // SubscribeStatusChange 订阅节点状态更改 -func (node *SlaveNode) SubscribeStatusChange(callback func()) { +func (node *SlaveNode) SubscribeStatusChange(callback func(bool, uint)) { node.lock.Lock() node.callback = callback node.lock.Unlock() @@ -77,6 +78,11 @@ func (node *SlaveNode) Ping(req *serializer.NodePingReq) (*serializer.NodePingRe return &res, nil } +// IsActive 返回节点是否在线 +func (node *SlaveNode) IsActive() bool { + return node.active +} + // getAPIUrl 获取接口请求地址 func (node *SlaveNode) getAPIUrl(scope string) string { node.lock.RLock() @@ -92,16 +98,55 @@ func (node *SlaveNode) getAPIUrl(scope string) string { return serverURL.ResolveReference(controller).String() } -func (node *SlaveNode) pingLoop() { - t := time.NewTicker(time.Second) +func (node *SlaveNode) changeStatus(isActive bool) { + node.lock.RLock() + id := node.Model.ID + if isActive != node.active { + node.lock.RUnlock() + node.lock.Lock() + node.active = isActive + node.lock.Unlock() + node.callback(isActive, id) + } else { + node.lock.RUnlock() + } + +} + +func (node *SlaveNode) StartPingLoop() { + node.lock.Lock() + node.close = make(chan bool) + node.active = true + node.lock.Unlock() + + t := time.NewTicker(time.Duration(model.GetIntSetting("slave_ping_interval", 300)) * time.Second) + defer t.Stop() + + util.Log().Debug("从机节点 [%s] 启动心跳循环", node.Model.Name) + retry := 0 +loop: for { select { case <-t.C: - case <-node.ctx.Done(): + util.Log().Debug("从机节点 [%s] 发送Ping", node.Model.Name) + res, err := node.Ping(&serializer.NodePingReq{}) + if err != nil { + util.Log().Debug("Ping从机节点 [%s] 时发生错误: %s", node.Model.Name, err) + retry++ + if retry > model.GetIntSetting("slave_node_retry", 3) { + util.Log().Debug("从机节点 [%s] Ping 重试已达到最大限制,将从机节点标记为不可用", node.Model.Name) + node.changeStatus(false) + break loop + } + break + } + + util.Log().Debug("从机节点 [%s] 状态: %s", node.Model.Name, res) + node.changeStatus(true) + case <-node.close: + util.Log().Debug("从机节点 [%s] 收到关闭信号", node.Model.Name) + break loop } } } - -// PingLoop -// RecoverLoop