diff --git a/bootstrap/init.go b/bootstrap/init.go index 98f2c8da..775ecd12 100644 --- a/bootstrap/init.go +++ b/bootstrap/init.go @@ -5,6 +5,7 @@ import ( "github.com/cloudreve/Cloudreve/v3/pkg/aria2" "github.com/cloudreve/Cloudreve/v3/pkg/auth" "github.com/cloudreve/Cloudreve/v3/pkg/cache" + "github.com/cloudreve/Cloudreve/v3/pkg/cluster" "github.com/cloudreve/Cloudreve/v3/pkg/conf" "github.com/cloudreve/Cloudreve/v3/pkg/crontab" "github.com/cloudreve/Cloudreve/v3/pkg/email" @@ -24,6 +25,7 @@ func Init(path string) { if conf.SystemConfig.Mode == "master" { model.Init() task.Init() + cluster.Init() aria2.Init(false) email.Init() crontab.Init() diff --git a/models/migration.go b/models/migration.go index 42fb14df..2a78cb0d 100644 --- a/models/migration.go +++ b/models/migration.go @@ -34,8 +34,9 @@ func migration() { if conf.DatabaseConfig.Type == "mysql" { DB = DB.Set("gorm:table_options", "ENGINE=InnoDB") } + DB.AutoMigrate(&User{}, &Setting{}, &Group{}, &Policy{}, &Folder{}, &File{}, &Share{}, - &Task{}, &Download{}, &Tag{}, &Webdav{}) + &Task{}, &Download{}, &Tag{}, &Webdav{}, &Node{}) // 创建初始存储策略 addDefaultPolicy() diff --git a/models/node.go b/models/node.go new file mode 100644 index 00000000..40371e72 --- /dev/null +++ b/models/node.go @@ -0,0 +1,34 @@ +package model + +import "github.com/jinzhu/gorm" + +// Node 从机节点信息模型 +type Node struct { + gorm.Model + Status NodeStatus // 任务状态 + Type ModelType // 任务状态 + Server string // 服务器地址 + SecretKey string `gorm:"type:text"` // 通信密钥 + Aria2Enabled bool // 是否支持用作离线下载节点 + Aria2Options string `gorm:"type:text"` // 离线下载配置 +} + +type NodeStatus int +type ModelType int + +const ( + NodeActive = iota + NodeSuspend +) + +const ( + SlaveNodeType = iota + MasterNodeType +) + +// GetNodesByStatus 根据给定状态获取节点 +func GetNodesByStatus(status ...NodeStatus) ([]Node, error) { + var nodes []Node + result := DB.Where("status in (?)", status).Find(&nodes) + return nodes, result.Error +} diff --git a/pkg/cluster/master.go b/pkg/cluster/master.go new file mode 100644 index 00000000..278a2296 --- /dev/null +++ b/pkg/cluster/master.go @@ -0,0 +1,21 @@ +package cluster + +import ( + model "github.com/cloudreve/Cloudreve/v3/models" +) + +type MasterNode struct { + Model *model.Node +} + +// IsFeatureEnabled 查询节点的某项功能是否启用 +func (node *MasterNode) IsFeatureEnabled(feature string) bool { + switch feature { + default: + return false + } +} + +// SubscribeStatusChange 订阅节点状态更改 +func (node *MasterNode) SubscribeStatusChange(callback func()) { +} diff --git a/pkg/cluster/node.go b/pkg/cluster/node.go new file mode 100644 index 00000000..b3085686 --- /dev/null +++ b/pkg/cluster/node.go @@ -0,0 +1,21 @@ +package cluster + +import model "github.com/cloudreve/Cloudreve/v3/models" + +type Node interface { + IsFeatureEnabled(feature string) bool + SubscribeStatusChange(callback func()) +} + +func getNodeFromDBModel(node *model.Node) Node { + switch node.Type { + case model.SlaveNodeType: + return &SlaveNode{ + Model: node, + } + default: + return &MasterNode{ + Model: node, + } + } +} diff --git a/pkg/cluster/pool.go b/pkg/cluster/pool.go new file mode 100644 index 00000000..35e77ff5 --- /dev/null +++ b/pkg/cluster/pool.go @@ -0,0 +1,68 @@ +package cluster + +import ( + model "github.com/cloudreve/Cloudreve/v3/models" + "github.com/cloudreve/Cloudreve/v3/pkg/util" + "sync" +) + +var Default *NodePool + +// 需要分类的节点组 +var featureGroup = []string{"Aria2"} + +// Pool 节点池 +type Pool interface { + Select() +} + +// NodePool 通用节点池 +type NodePool struct { + nodes []Node + featureMap map[string][]Node + lock sync.RWMutex +} + +func (pool *NodePool) Select() { + +} + +// Init 初始化从机节点池 +func Init() { + Default = &NodePool{ + featureMap: make(map[string][]Node), + } + if err := Default.initFromDB(); err != nil { + util.Log().Warning("节点池初始化失败, %s", err) + } +} + +func (pool *NodePool) initFromDB() error { + nodes, err := model.GetNodesByStatus(model.NodeActive) + if err != nil { + return err + } + + pool.lock.Lock() + + for _, feature := range featureGroup { + pool.featureMap[feature] = make([]Node, 0) + } + + for i := 0; i < len(nodes); i++ { + newNode := getNodeFromDBModel(&nodes[i]) + pool.nodes = append(pool.nodes, newNode) + + for _, feature := range featureGroup { + if newNode.IsFeatureEnabled(feature) { + pool.featureMap[feature] = append(pool.featureMap[feature], newNode) + } + } + + newNode.SubscribeStatusChange(func() { + }) + } + + pool.lock.Unlock() + return nil +} diff --git a/pkg/cluster/slave.go b/pkg/cluster/slave.go new file mode 100644 index 00000000..852f039c --- /dev/null +++ b/pkg/cluster/slave.go @@ -0,0 +1,30 @@ +package cluster + +import ( + model "github.com/cloudreve/Cloudreve/v3/models" + "sync" +) + +type SlaveNode struct { + Model *model.Node + callback func() + lock sync.RWMutex +} + +// IsFeatureEnabled 查询节点的某项功能是否启用 +func (node *SlaveNode) IsFeatureEnabled(feature string) bool { + switch feature { + default: + return false + } +} + +// SubscribeStatusChange 订阅节点状态更改 +func (node *SlaveNode) SubscribeStatusChange(callback func()) { + node.lock.Lock() + node.callback = callback + node.lock.Unlock() +} + +// PingLoop +// RecoverLoop