package routine import ( "sync" ) type Pool struct { taskQueue chan T taskFn func(T) workers int panicHandler func(any) wg sync.WaitGroup } func NewPool(opt ...Option) *Pool { opts := loadOptions(opt...) pool := &Pool{ taskQueue: make(chan T, opts.capacity), panicHandler: opts.panicHandler, workers: opts.workers, } pool.taskFn = func(t T) { defer func() { // 处理协程运行中出现panic的情况 if r := recover(); r != nil { if pool.panicHandler != nil { pool.panicHandler(r) } } }() opts.taskFn(t) } pool.wg.Add(opts.workers) return pool } // Start 启动任务 func (p *Pool) Start() { for i := 0; i < p.workers; i++ { go func() { defer p.wg.Done() for { task, ok := <-p.taskQueue if !ok { return } p.taskFn(task) } }() } } // Push 提交任务 func (p *Pool) Push(task T) { p.taskQueue <- task } // Wait 挂起当前协程 func (p *Pool) Wait() { close(p.taskQueue) p.wg.Wait() }