|
|
|
|
package task
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
model "github.com/cloudreve/Cloudreve/v3/models"
|
|
|
|
|
"github.com/cloudreve/Cloudreve/v3/pkg/conf"
|
|
|
|
|
"github.com/cloudreve/Cloudreve/v3/pkg/util"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// TaskPoll 要使用的任务池
|
|
|
|
|
var TaskPoll Pool
|
|
|
|
|
|
|
|
|
|
type Pool interface {
|
|
|
|
|
Add(num int)
|
|
|
|
|
Submit(job Job)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// AsyncPool 带有最大配额的任务池
|
|
|
|
|
type AsyncPool struct {
|
|
|
|
|
// 容量
|
|
|
|
|
idleWorker chan int
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Add 增加可用Worker数量
|
|
|
|
|
func (pool *AsyncPool) Add(num int) {
|
|
|
|
|
for i := 0; i < num; i++ {
|
|
|
|
|
pool.idleWorker <- 1
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ObtainWorker 阻塞直到获取新的Worker
|
|
|
|
|
func (pool *AsyncPool) obtainWorker() Worker {
|
|
|
|
|
select {
|
|
|
|
|
case <-pool.idleWorker:
|
|
|
|
|
// 有空闲Worker名额时,返回新Worker
|
|
|
|
|
return &GeneralWorker{}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// FreeWorker 添加空闲Worker
|
|
|
|
|
func (pool *AsyncPool) freeWorker() {
|
|
|
|
|
pool.Add(1)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Submit 开始提交任务
|
|
|
|
|
func (pool *AsyncPool) Submit(job Job) {
|
|
|
|
|
go func() {
|
|
|
|
|
util.Log().Debug("Waiting for Worker.")
|
|
|
|
|
worker := pool.obtainWorker()
|
|
|
|
|
util.Log().Debug("Worker obtained.")
|
|
|
|
|
worker.Do(job)
|
|
|
|
|
util.Log().Debug("Worker released.")
|
|
|
|
|
pool.freeWorker()
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Init 初始化任务池
|
|
|
|
|
func Init() {
|
|
|
|
|
maxWorker := model.GetIntSetting("max_worker_num", 10)
|
|
|
|
|
TaskPoll = &AsyncPool{
|
|
|
|
|
idleWorker: make(chan int, maxWorker),
|
|
|
|
|
}
|
|
|
|
|
TaskPoll.Add(maxWorker)
|
|
|
|
|
util.Log().Info("Initialize task queue with WorkerNum = %d", maxWorker)
|
|
|
|
|
|
|
|
|
|
if conf.SystemConfig.Mode == "master" {
|
|
|
|
|
Resume(TaskPoll)
|
|
|
|
|
}
|
|
|
|
|
}
|