diff --git a/lock_free/delay_queue.go b/lock_free/delay_queue.go index 2531aee..ad7ee5f 100644 --- a/lock_free/delay_queue.go +++ b/lock_free/delay_queue.go @@ -1,6 +1,9 @@ package lock_free -import "time" +import ( + "iter" + "time" +) type DelayLkQueue[T any] struct { LkQueue[T] @@ -18,15 +21,33 @@ func (q *DelayLkQueue[T]) DelayEnqueue(value T, duration time.Duration) { }) } -// ContinuousDequeue 持续监听出队通知 -func (q *DelayLkQueue[T]) ContinuousDequeue(notify ...chan T) { - for { - if value, ok := q.Dequeue(); ok { - for _, n := range notify { - n <- value +// ContinuousDequeue 持续监听出队 +func (q *DelayLkQueue[T]) ContinuousDequeue() iter.Seq[T] { + return func(yield func(T) bool) { + for { + if value, ok := q.Dequeue(); ok { + if !yield(value) { + return + } + } else { + time.Sleep(time.Millisecond) // 队列为空,休眠1毫秒 } - } else { - time.Sleep(time.Millisecond) // 队列为空,休眠1毫秒 } } } + +// ContinuousDequeueExecute 持续监听出队执行函数 +func (q *DelayLkQueue[T]) ContinuousDequeueExecute(fn func(T)) { + for value := range q.ContinuousDequeue() { + fn(value) + } +} + +// ContinuousDequeueNotify 持续监听出队通知 +func (q *DelayLkQueue[T]) ContinuousDequeueNotify(chs ...chan T) { + q.ContinuousDequeueExecute(func(value T) { + for _, ch := range chs { + ch <- value + } + }) +} diff --git a/lock_free/delay_queue_test.go b/lock_free/delay_queue_test.go index 0bce92e..4cf151f 100644 --- a/lock_free/delay_queue_test.go +++ b/lock_free/delay_queue_test.go @@ -27,7 +27,7 @@ func TestDelayLkQueue(t *testing.T) { } }() - go q.ContinuousDequeue(notify) + go q.ContinuousDequeueNotify(notify) time.Sleep(time.Second * 5) close(notify) }