diff --git a/routine/options.go b/routine/options.go index 8ca94bc..2b9850a 100644 --- a/routine/options.go +++ b/routine/options.go @@ -1,6 +1,8 @@ package routine -type options[T any] struct { +type T any + +type options struct { workers int capacity int taskFn func(T) @@ -8,11 +10,11 @@ type options[T any] struct { } // Option function -type Option[T any] func(*options[T]) +type Option func(*options) -// setOptions 设置默认值 -func setOptions[T any](opt ...Option[T]) options[T] { - opts := options[T]{ +// loadOptions 设置默认值 +func loadOptions(opt ...Option) options { + opts := options{ workers: 1, capacity: 1, } @@ -25,35 +27,35 @@ func setOptions[T any](opt ...Option[T]) options[T] { } // WithWorkers 设置协程数 -func WithWorkers[T any](workers int) Option[T] { +func WithWorkers(workers int) Option { if workers <= 0 { workers = 1 } - return func(o *options[T]) { + return func(o *options) { o.workers = workers } } // WithCapacity 设置任务队列容量 -func WithCapacity[T any](capacity int) Option[T] { +func WithCapacity(capacity int) Option { if capacity <= 0 { capacity = 1 } - return func(o *options[T]) { + return func(o *options) { o.capacity = capacity } } // WithTaskFn 设置任务函数 -func WithTaskFn[T any](taskFn func(T)) Option[T] { - return func(o *options[T]) { +func WithTaskFn(taskFn func(T)) Option { + return func(o *options) { o.taskFn = taskFn } } // WithPanicHandler 设置panic处理函数 -func WithPanicHandler[T any](panicHandler func(any)) Option[T] { - return func(o *options[T]) { +func WithPanicHandler(panicHandler func(any)) Option { + return func(o *options) { o.panicHandler = panicHandler } } diff --git a/routine/pool.go b/routine/pool.go index d7bed9c..aef4ee1 100644 --- a/routine/pool.go +++ b/routine/pool.go @@ -4,7 +4,7 @@ import ( "sync" ) -type Pool[T any] struct { +type Pool struct { taskQueue chan T taskFn func(T) workers int @@ -12,9 +12,9 @@ type Pool[T any] struct { wg sync.WaitGroup } -func NewPool[T any](opt ...Option[T]) *Pool[T] { - opts := setOptions(opt...) - pool := &Pool[T]{ +func NewPool(opt ...Option) *Pool { + opts := loadOptions(opt...) + pool := &Pool{ taskQueue: make(chan T, opts.capacity), panicHandler: opts.panicHandler, workers: opts.workers, @@ -37,7 +37,7 @@ func NewPool[T any](opt ...Option[T]) *Pool[T] { } // Start 启动任务 -func (p *Pool[T]) Start() { +func (p *Pool) Start() { for i := 0; i < p.workers; i++ { go func() { defer p.wg.Done() @@ -55,12 +55,12 @@ func (p *Pool[T]) Start() { } // Push 提交任务 -func (p *Pool[T]) Push(task T) { +func (p *Pool) Push(task T) { p.taskQueue <- task } // Wait 挂起当前协程 -func (p *Pool[T]) Wait() { +func (p *Pool) Wait() { close(p.taskQueue) p.wg.Wait() } diff --git a/routine/pool_test.go b/routine/pool_test.go index f29a52e..0044e55 100644 --- a/routine/pool_test.go +++ b/routine/pool_test.go @@ -14,7 +14,8 @@ func TestPool(t *testing.T) { num := runtime.NumCPU() var sum atomic.Int32 - task := func(num int32) { + task := func(t T) { + num := t.(int32) if num < 0 { panic("unable to handle negative numbers") } @@ -25,10 +26,10 @@ func TestPool(t *testing.T) { fmt.Printf("Panic: %v\n %s", r, string(debug.Stack())) } pool := NewPool( - WithWorkers[int32](num), - WithCapacity[int32](num), + WithWorkers(num), + WithCapacity(num), WithTaskFn(task), - WithPanicHandler[int32](handler), + WithPanicHandler(handler), ) pool.Start()