You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cloudreve/pkg/task/pool.go

125 lines
2.4 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package task
import "sync"
// Pool 带有最大配额的goroutines任务池
type Pool struct {
// 容量
capacity int
// 初始容量
initialCapacity int
// 终止信号
terminateSignal chan error
// 全部任务完成的信号
finishSignal chan bool
// 有空余位置的信号
freeSignal chan bool
// 是否已关闭
closed bool
// 是否正在等待任务结束
waiting bool
// 互斥锁
lock sync.Mutex
// 等待队列
pending []Job
}
// Job 任务
type Job interface {
// 任务处理方法如果error不为nil
// 任务池会关闭并中止接受新任务
Do() error
}
// NewGoroutinePool 创建一个容量为capacity的任务池
func NewGoroutinePool(capacity int) *Pool {
pool := &Pool{
capacity: capacity,
initialCapacity: capacity,
terminateSignal: make(chan error),
finishSignal: make(chan bool),
freeSignal: make(chan bool),
}
go pool.Schedule()
return pool
}
// Schedule 为等待队列的任务分配Worker以及检测错误状态、所有任务完成
func (pool *Pool) Schedule() {
for {
select {
case <-pool.freeSignal:
// 有新的空余名额
pool.lock.Lock()
if len(pool.pending) > 0 {
// 有待处理的任务,开始处理
var job Job
job, pool.pending = pool.pending[0], pool.pending[1:]
go pool.start(job)
} else {
if pool.waiting && pool.capacity == pool.initialCapacity {
// 所有任务已结束
pool.lock.Unlock()
pool.finishSignal <- true
return
}
pool.lock.Unlock()
}
case <-pool.terminateSignal:
// 有任务意外中止,则发送完成信号
pool.finishSignal <- true
return
}
}
}
// Wait 等待队列中所有任务完成或有Job返回错误中止
func (pool *Pool) Wait() chan bool {
pool.lock.Lock()
pool.waiting = true
pool.lock.Unlock()
return pool.finishSignal
}
// Submit 提交新任务
func (pool *Pool) Submit(job Job) {
if pool.closed {
return
}
pool.lock.Lock()
if pool.capacity < 1 {
// 容量为空时,加入等待队列
pool.pending = append(pool.pending, job)
pool.lock.Unlock()
return
}
// 还有空闲容量时,开始执行任务
go pool.start(job)
}
// 开始执行任务
func (pool *Pool) start(job Job) {
pool.capacity--
pool.lock.Unlock()
err := job.Do()
if err != nil {
pool.closed = true
select {
case <-pool.terminateSignal:
default:
close(pool.terminateSignal)
}
}
pool.lock.Lock()
pool.capacity++
pool.lock.Unlock()
pool.freeSignal <- true
}