From 0ed7839479d87cec83e7e3feea5c2b046701c617 Mon Sep 17 00:00:00 2001 From: HFO4 <912394456@qq.com> Date: Fri, 29 Oct 2021 20:24:07 +0800 Subject: [PATCH] Feat: add and delete node in node pool --- models/migration.go | 4 +-- models/node.go | 1 + pkg/cluster/pool.go | 66 ++++++++++++++++++++++++++++++------ routers/controllers/admin.go | 11 ++++++ routers/router.go | 2 ++ service/admin/node.go | 12 +++++++ 6 files changed, 83 insertions(+), 13 deletions(-) diff --git a/models/migration.go b/models/migration.go index 8bbca38d..de4a209f 100644 --- a/models/migration.go +++ b/models/migration.go @@ -106,8 +106,8 @@ func addDefaultSettings() { {Name: "upload_session_timeout", Value: `86400`, Type: "timeout"}, {Name: "slave_api_timeout", Value: `60`, Type: "timeout"}, {Name: "slave_node_retry", Value: `3`, Type: "slave"}, - {Name: "slave_ping_interval", Value: `300`, Type: "slave"}, - {Name: "slave_recover_interval", Value: `600`, Type: "slave"}, + {Name: "slave_ping_interval", Value: `60`, Type: "slave"}, + {Name: "slave_recover_interval", Value: `120`, Type: "slave"}, {Name: "slave_transfer_timeout", Value: `172800`, Type: "timeout"}, {Name: "onedrive_monitor_timeout", Value: `600`, Type: "timeout"}, {Name: "share_download_session_timeout", Value: `2073600`, Type: "timeout"}, diff --git a/models/node.go b/models/node.go index 2eebf5c4..977c8b51 100644 --- a/models/node.go +++ b/models/node.go @@ -16,6 +16,7 @@ type Node struct { MasterKey string `gorm:"type:text"` // 从->主 通信密钥 Aria2Enabled bool // 是否支持用作离线下载节点 Aria2Options string `gorm:"type:text"` // 离线下载配置 + Rank int // 负载均衡权重 // 数据库忽略字段 Aria2OptionsSerialized Aria2Option `gorm:"-"` diff --git a/pkg/cluster/pool.go b/pkg/cluster/pool.go index 9893dcf6..131fcf16 100644 --- a/pkg/cluster/pool.go +++ b/pkg/cluster/pool.go @@ -19,6 +19,12 @@ type Pool interface { // Returns node by ID GetNodeByID(id uint) Node + + // Add given node into pool. If node existed, refresh node. + Add(node *model.Node) + + // Delete and kill node from pool by given node id + Delete(id uint) } // NodePool 通用节点池 @@ -95,17 +101,7 @@ func (pool *NodePool) initFromDB() error { pool.active = make(map[uint]Node) pool.inactive = make(map[uint]Node) for i := 0; i < len(nodes); i++ { - newNode := NewNodeFromDBModel(&nodes[i]) - if newNode.IsActive() { - pool.active[nodes[i].ID] = newNode - } else { - pool.inactive[nodes[i].ID] = newNode - } - - // 订阅节点状态变更 - newNode.SubscribeStatusChange(func(isActive bool, id uint) { - pool.nodeStatusChange(isActive, id) - }) + pool.add(&nodes[i]) } pool.lock.Unlock() @@ -113,6 +109,54 @@ func (pool *NodePool) initFromDB() error { return nil } +func (pool *NodePool) add(node *model.Node) { + newNode := NewNodeFromDBModel(node) + if newNode.IsActive() { + pool.active[node.ID] = newNode + } else { + pool.inactive[node.ID] = newNode + } + + // 订阅节点状态变更 + newNode.SubscribeStatusChange(func(isActive bool, id uint) { + pool.nodeStatusChange(isActive, id) + }) +} + +func (pool *NodePool) Add(node *model.Node) { + pool.lock.Lock() + defer pool.lock.Unlock() + + if _, ok := pool.active[node.ID]; ok { + // TODO: refresh node + return + } + + if _, ok := pool.inactive[node.ID]; ok { + return + } + + pool.add(node) +} + +func (pool *NodePool) Delete(id uint) { + pool.lock.Lock() + defer pool.lock.Unlock() + + if node, ok := pool.active[id]; ok { + node.Kill() + delete(pool.active, id) + return + } + + if node, ok := pool.inactive[id]; ok { + node.Kill() + delete(pool.inactive, id) + return + } + +} + // BalanceNodeByFeature 根据 feature 和 LoadBalancer 取出节点 func (pool *NodePool) BalanceNodeByFeature(feature string, lb balancer.Balancer) (error, Node) { pool.lock.RLock() diff --git a/routers/controllers/admin.go b/routers/controllers/admin.go index aab8a40b..e0d411ba 100644 --- a/routers/controllers/admin.go +++ b/routers/controllers/admin.go @@ -454,3 +454,14 @@ func AdminAddNode(c *gin.Context) { c.JSON(200, ErrorResponse(err)) } } + +// AdminToggleNode 启用/暂停节点 +func AdminToggleNode(c *gin.Context) { + var service admin.ToggleNodeService + if err := c.ShouldBindUri(&service); err == nil { + res := service.Toggle() + c.JSON(200, res) + } else { + c.JSON(200, ErrorResponse(err)) + } +} diff --git a/routers/router.go b/routers/router.go index 01071d34..c620b4d1 100644 --- a/routers/router.go +++ b/routers/router.go @@ -452,6 +452,8 @@ func InitMasterRouter() *gin.Engine { node.POST("aria2/test", controllers.AdminTestAria2) // 创建/保存节点 node.POST("", controllers.AdminAddNode) + // 启用/暂停节点 + node.PATCH("enable/:id/:desired", controllers.AdminToggleNode) } } diff --git a/service/admin/node.go b/service/admin/node.go index eaf8abba..1e7b2162 100644 --- a/service/admin/node.go +++ b/service/admin/node.go @@ -69,3 +69,15 @@ func (service *AdminListService) Nodes() serializer.Response { "active": isActive, }} } + +// ToggleNodeService 开关节点服务 +type ToggleNodeService struct { + ID uint `uri:"id"` + Desired int `uri:"desired"` +} + +// Toggle 开关节点 +func (service *ToggleNodeService) Toggle() serializer.Response { + + return serializer.Response{} +}