|
|
|
|
package task
|
|
|
|
|
|
|
|
|
|
import "sync"
|
|
|
|
|
|
|
|
|
|
// Pool 带有最大配额的goroutines任务池
|
|
|
|
|
type Pool struct {
|
|
|
|
|
// 容量
|
|
|
|
|
capacity int
|
|
|
|
|
// 初始容量
|
|
|
|
|
initialCapacity int
|
|
|
|
|
|
|
|
|
|
// 终止信号
|
|
|
|
|
terminateSignal chan error
|
|
|
|
|
// 全部任务完成的信号
|
|
|
|
|
finishSignal chan bool
|
|
|
|
|
// 有空余位置的信号
|
|
|
|
|
freeSignal chan bool
|
|
|
|
|
|
|
|
|
|
// 是否已关闭
|
|
|
|
|
closed bool
|
|
|
|
|
// 是否正在等待任务结束
|
|
|
|
|
waiting bool
|
|
|
|
|
|
|
|
|
|
// 互斥锁
|
|
|
|
|
lock sync.Mutex
|
|
|
|
|
// 等待队列
|
|
|
|
|
pending []Job
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Job 任务
|
|
|
|
|
type Job interface {
|
|
|
|
|
// 任务处理方法,如果error不为nil,
|
|
|
|
|
// 任务池会关闭并中止接受新任务
|
|
|
|
|
Do() error
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewGoroutinePool 创建一个容量为capacity的任务池
|
|
|
|
|
func NewGoroutinePool(capacity int) *Pool {
|
|
|
|
|
pool := &Pool{
|
|
|
|
|
capacity: capacity,
|
|
|
|
|
initialCapacity: capacity,
|
|
|
|
|
terminateSignal: make(chan error),
|
|
|
|
|
finishSignal: make(chan bool),
|
|
|
|
|
freeSignal: make(chan bool),
|
|
|
|
|
}
|
|
|
|
|
go pool.Schedule()
|
|
|
|
|
return pool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Schedule 为等待队列的任务分配Worker,以及检测错误状态、所有任务完成
|
|
|
|
|
func (pool *Pool) Schedule() {
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-pool.freeSignal:
|
|
|
|
|
// 有新的空余名额
|
|
|
|
|
pool.lock.Lock()
|
|
|
|
|
if len(pool.pending) > 0 {
|
|
|
|
|
// 有待处理的任务,开始处理
|
|
|
|
|
var job Job
|
|
|
|
|
job, pool.pending = pool.pending[0], pool.pending[1:]
|
|
|
|
|
go pool.start(job)
|
|
|
|
|
} else {
|
|
|
|
|
if pool.waiting && pool.capacity == pool.initialCapacity {
|
|
|
|
|
// 所有任务已结束
|
|
|
|
|
pool.lock.Unlock()
|
|
|
|
|
pool.finishSignal <- true
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
pool.lock.Unlock()
|
|
|
|
|
}
|
|
|
|
|
case <-pool.terminateSignal:
|
|
|
|
|
// 有任务意外中止,则发送完成信号
|
|
|
|
|
pool.finishSignal <- true
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Wait 等待队列中所有任务完成或有Job返回错误中止
|
|
|
|
|
func (pool *Pool) Wait() chan bool {
|
|
|
|
|
pool.lock.Lock()
|
|
|
|
|
pool.waiting = true
|
|
|
|
|
pool.lock.Unlock()
|
|
|
|
|
return pool.finishSignal
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Submit 提交新任务
|
|
|
|
|
func (pool *Pool) Submit(job Job) {
|
|
|
|
|
if pool.closed {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pool.lock.Lock()
|
|
|
|
|
if pool.capacity < 1 {
|
|
|
|
|
// 容量为空时,加入等待队列
|
|
|
|
|
pool.pending = append(pool.pending, job)
|
|
|
|
|
pool.lock.Unlock()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 还有空闲容量时,开始执行任务
|
|
|
|
|
go pool.start(job)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 开始执行任务
|
|
|
|
|
func (pool *Pool) start(job Job) {
|
|
|
|
|
pool.capacity--
|
|
|
|
|
pool.lock.Unlock()
|
|
|
|
|
|
|
|
|
|
err := job.Do()
|
|
|
|
|
if err != nil {
|
|
|
|
|
pool.closed = true
|
|
|
|
|
select {
|
|
|
|
|
case <-pool.terminateSignal:
|
|
|
|
|
default:
|
|
|
|
|
close(pool.terminateSignal)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pool.lock.Lock()
|
|
|
|
|
pool.capacity++
|
|
|
|
|
pool.lock.Unlock()
|
|
|
|
|
pool.freeSignal <- true
|
|
|
|
|
}
|