From 49ebe592b31285bc260408457e26f5793ff24060 Mon Sep 17 00:00:00 2001 From: fantasticbin Date: Mon, 9 Dec 2024 10:50:31 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E9=98=9F=E5=88=97=E9=9B=86?= =?UTF-8?q?=E5=90=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lock_free/delay_queue.go | 40 ++++++--- lock_free/queues.go | 86 +++++++++++++++++++ .../{delay_queue_test.go => queues_test.go} | 9 +- 3 files changed, 121 insertions(+), 14 deletions(-) create mode 100644 lock_free/queues.go rename lock_free/{delay_queue_test.go => queues_test.go} (66%) diff --git a/lock_free/delay_queue.go b/lock_free/delay_queue.go index ad7ee5f..fc60209 100644 --- a/lock_free/delay_queue.go +++ b/lock_free/delay_queue.go @@ -5,25 +5,45 @@ import ( "time" ) -type DelayLkQueue[T any] struct { - LkQueue[T] +type DelayLkQueue[TValue any, TKey comparable] struct { + timers map[TKey]*time.Timer + LkQueue[TValue] } // NewDelayLkQueue 创建延迟无锁队列 -func NewDelayLkQueue[T any]() *DelayLkQueue[T] { - return &DelayLkQueue[T]{*NewLkQueue[T]()} +func NewDelayLkQueue[TValue any, TKey comparable]() *DelayLkQueue[TValue, TKey] { + return &DelayLkQueue[TValue, TKey]{make(map[TKey]*time.Timer), *NewLkQueue[TValue]()} } // DelayEnqueue 延迟入队 -func (q *DelayLkQueue[T]) DelayEnqueue(value T, duration time.Duration) { +func (q *DelayLkQueue[TValue, TKey]) DelayEnqueue(value TValue, duration time.Duration) { time.AfterFunc(duration, func() { q.Enqueue(value) }) } +// CancellableDelayEnqueue 可取消的延迟入队 +func (q *DelayLkQueue[TValue, TKey]) CancellableDelayEnqueue(key TKey, value TValue, duration time.Duration) { + if timer, ok := q.timers[key]; ok { + timer.Stop() + } + q.timers[key] = time.AfterFunc(duration, func() { + delete(q.timers, key) + q.Enqueue(value) + }) +} + +// CancelDelayEnqueue 取消延迟入队 +func (q *DelayLkQueue[TValue, TKey]) CancelDelayEnqueue(key TKey) { + if timer, ok := q.timers[key]; ok { + delete(q.timers, key) + timer.Stop() + } +} + // ContinuousDequeue 持续监听出队 -func (q *DelayLkQueue[T]) ContinuousDequeue() iter.Seq[T] { - return func(yield func(T) bool) { +func (q *DelayLkQueue[TValue, TKey]) ContinuousDequeue() iter.Seq[TValue] { + return func(yield func(TValue) bool) { for { if value, ok := q.Dequeue(); ok { if !yield(value) { @@ -37,15 +57,15 @@ func (q *DelayLkQueue[T]) ContinuousDequeue() iter.Seq[T] { } // ContinuousDequeueExecute 持续监听出队执行函数 -func (q *DelayLkQueue[T]) ContinuousDequeueExecute(fn func(T)) { +func (q *DelayLkQueue[TValue, TKey]) ContinuousDequeueExecute(fn func(TValue)) { for value := range q.ContinuousDequeue() { fn(value) } } // ContinuousDequeueNotify 持续监听出队通知 -func (q *DelayLkQueue[T]) ContinuousDequeueNotify(chs ...chan T) { - q.ContinuousDequeueExecute(func(value T) { +func (q *DelayLkQueue[TValue, TKey]) ContinuousDequeueNotify(chs ...chan TValue) { + q.ContinuousDequeueExecute(func(value TValue) { for _, ch := range chs { ch <- value } diff --git a/lock_free/queues.go b/lock_free/queues.go new file mode 100644 index 0000000..1fdafd1 --- /dev/null +++ b/lock_free/queues.go @@ -0,0 +1,86 @@ +package lock_free + +import ( + "iter" + "time" +) + +// Queues 队列集合 +type Queues[TValue any, TKey, TRoute comparable] struct { + queues map[TRoute]*DelayLkQueue[TValue, TKey] +} + +// NewQueues 创建队列集合 +func NewQueues[TValue any, TKey, TRoute comparable]() *Queues[TValue, TKey, TRoute] { + return &Queues[TValue, TKey, TRoute]{make(map[TRoute]*DelayLkQueue[TValue, TKey])} +} + +// Enqueue 入队 +func (q *Queues[TValue, TKey, TRoute]) Enqueue(route TRoute, value TValue) { + if queue, ok := q.queues[route]; ok { + queue.Enqueue(value) + } else { + queue := NewDelayLkQueue[TValue, TKey]() + q.queues[route] = queue + queue.Enqueue(value) + } +} + +// Dequeue 出队 +func (q *Queues[TValue, TKey, TRoute]) Dequeue(route TRoute) (value TValue, ok bool) { + if queue, ok := q.queues[route]; ok { + return queue.Dequeue() + } + return value, false +} + +// DelayEnqueue 延迟入队 +func (q *Queues[TValue, TKey, TRoute]) DelayEnqueue(route TRoute, value TValue, duration time.Duration) { + if queue, ok := q.queues[route]; ok { + queue.DelayEnqueue(value, duration) + } else { + queue := NewDelayLkQueue[TValue, TKey]() + q.queues[route] = queue + queue.DelayEnqueue(value, duration) + } +} + +// CancellableDelayEnqueue 可取消的延迟入队 +func (q *Queues[TValue, TKey, TRoute]) CancellableDelayEnqueue(route TRoute, key TKey, value TValue, duration time.Duration) { + if queue, ok := q.queues[route]; ok { + queue.CancellableDelayEnqueue(key, value, duration) + } else { + queue := NewDelayLkQueue[TValue, TKey]() + q.queues[route] = queue + queue.CancellableDelayEnqueue(key, value, duration) + } +} + +// CancelDelayEnqueue 取消延迟入队 +func (q *Queues[TValue, TKey, TRoute]) CancelDelayEnqueue(route TRoute, key TKey) { + if queue, ok := q.queues[route]; ok { + queue.CancelDelayEnqueue(key) + } +} + +// ContinuousDequeue 持续监听出队 +func (q *Queues[TValue, TKey, TRoute]) ContinuousDequeue(route TRoute) iter.Seq[TValue] { + if queue, ok := q.queues[route]; ok { + return queue.ContinuousDequeue() + } + return nil +} + +// ContinuousDequeueExecute 持续监听出队执行函数 +func (q *Queues[TValue, TKey, TRoute]) ContinuousDequeueExecute(route TRoute, fn func(TValue)) { + if queue, ok := q.queues[route]; ok { + queue.ContinuousDequeueExecute(fn) + } +} + +// ContinuousDequeueNotify 持续监听出队通知 +func (q *Queues[TValue, TKey, TRoute]) ContinuousDequeueNotify(route TRoute, chs ...chan TValue) { + if queue, ok := q.queues[route]; ok { + queue.ContinuousDequeueNotify(chs...) + } +} diff --git a/lock_free/delay_queue_test.go b/lock_free/queues_test.go similarity index 66% rename from lock_free/delay_queue_test.go rename to lock_free/queues_test.go index 4cf151f..32dee54 100644 --- a/lock_free/delay_queue_test.go +++ b/lock_free/queues_test.go @@ -5,7 +5,7 @@ import ( "time" ) -func TestDelayLkQueue(t *testing.T) { +func TestQueues(t *testing.T) { cases := []struct { value int duration time.Duration @@ -13,10 +13,11 @@ func TestDelayLkQueue(t *testing.T) { {1, time.Second}, {3, time.Second * 3}, } - q := NewDelayLkQueue[int]() + route := "test" + q := NewQueues[int, struct{}, string]() for _, c := range cases { - q.DelayEnqueue(c.value, c.duration) + q.DelayEnqueue(route, c.value, c.duration) } notify := make(chan int) @@ -27,7 +28,7 @@ func TestDelayLkQueue(t *testing.T) { } }() - go q.ContinuousDequeueNotify(notify) + go q.ContinuousDequeueNotify(route, notify) time.Sleep(time.Second * 5) close(notify) }