From 95c27f6e965dda4205ad7ad4d4759cf00609d510 Mon Sep 17 00:00:00 2001 From: fantasticbin Date: Mon, 9 Dec 2024 20:35:24 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BB=B6=E8=BF=9F=E9=98=9F=E5=88=97=E5=8F=8A?= =?UTF-8?q?=E9=98=9F=E5=88=97=E9=9B=86=E5=90=88=E5=A2=9E=E5=8A=A0=E9=94=81?= =?UTF-8?q?=E6=8E=A7=E5=88=B6map=E8=AF=BB=E5=86=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lock_free/delay_queue.go | 8 +++++++- lock_free/queues.go | 28 +++++++++++++++++++++++++++- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/lock_free/delay_queue.go b/lock_free/delay_queue.go index fc60209..b7f5394 100644 --- a/lock_free/delay_queue.go +++ b/lock_free/delay_queue.go @@ -2,17 +2,19 @@ package lock_free import ( "iter" + "sync" "time" ) type DelayLkQueue[TValue any, TKey comparable] struct { timers map[TKey]*time.Timer + m sync.Mutex LkQueue[TValue] } // NewDelayLkQueue 创建延迟无锁队列 func NewDelayLkQueue[TValue any, TKey comparable]() *DelayLkQueue[TValue, TKey] { - return &DelayLkQueue[TValue, TKey]{make(map[TKey]*time.Timer), *NewLkQueue[TValue]()} + return &DelayLkQueue[TValue, TKey]{make(map[TKey]*time.Timer), sync.Mutex{}, *NewLkQueue[TValue]()} } // DelayEnqueue 延迟入队 @@ -27,6 +29,8 @@ func (q *DelayLkQueue[TValue, TKey]) CancellableDelayEnqueue(key TKey, value TVa if timer, ok := q.timers[key]; ok { timer.Stop() } + q.m.Lock() + defer q.m.Unlock() q.timers[key] = time.AfterFunc(duration, func() { delete(q.timers, key) q.Enqueue(value) @@ -35,6 +39,8 @@ func (q *DelayLkQueue[TValue, TKey]) CancellableDelayEnqueue(key TKey, value TVa // CancelDelayEnqueue 取消延迟入队 func (q *DelayLkQueue[TValue, TKey]) CancelDelayEnqueue(key TKey) { + q.m.Lock() + defer q.m.Unlock() if timer, ok := q.timers[key]; ok { delete(q.timers, key) timer.Stop() diff --git a/lock_free/queues.go b/lock_free/queues.go index 1fdafd1..b557ca8 100644 --- a/lock_free/queues.go +++ b/lock_free/queues.go @@ -2,32 +2,41 @@ package lock_free import ( "iter" + "sync" "time" ) // Queues 队列集合 type Queues[TValue any, TKey, TRoute comparable] struct { queues map[TRoute]*DelayLkQueue[TValue, TKey] + m sync.RWMutex } // NewQueues 创建队列集合 func NewQueues[TValue any, TKey, TRoute comparable]() *Queues[TValue, TKey, TRoute] { - return &Queues[TValue, TKey, TRoute]{make(map[TRoute]*DelayLkQueue[TValue, TKey])} + return &Queues[TValue, TKey, TRoute]{make(map[TRoute]*DelayLkQueue[TValue, TKey]), sync.RWMutex{}} } // Enqueue 入队 func (q *Queues[TValue, TKey, TRoute]) Enqueue(route TRoute, value TValue) { + q.m.RLock() if queue, ok := q.queues[route]; ok { + q.m.RUnlock() queue.Enqueue(value) } else { + q.m.RUnlock() queue := NewDelayLkQueue[TValue, TKey]() + q.m.Lock() q.queues[route] = queue + q.m.Unlock() queue.Enqueue(value) } } // Dequeue 出队 func (q *Queues[TValue, TKey, TRoute]) Dequeue(route TRoute) (value TValue, ok bool) { + q.m.RLock() + defer q.m.RUnlock() if queue, ok := q.queues[route]; ok { return queue.Dequeue() } @@ -36,28 +45,39 @@ func (q *Queues[TValue, TKey, TRoute]) Dequeue(route TRoute) (value TValue, ok b // DelayEnqueue 延迟入队 func (q *Queues[TValue, TKey, TRoute]) DelayEnqueue(route TRoute, value TValue, duration time.Duration) { + q.m.RLock() if queue, ok := q.queues[route]; ok { + q.m.RUnlock() queue.DelayEnqueue(value, duration) } else { + q.m.RUnlock() queue := NewDelayLkQueue[TValue, TKey]() + q.m.Lock() q.queues[route] = queue + q.m.Unlock() queue.DelayEnqueue(value, duration) } } // CancellableDelayEnqueue 可取消的延迟入队 func (q *Queues[TValue, TKey, TRoute]) CancellableDelayEnqueue(route TRoute, key TKey, value TValue, duration time.Duration) { + q.m.RLock() + defer q.m.RUnlock() if queue, ok := q.queues[route]; ok { queue.CancellableDelayEnqueue(key, value, duration) } else { queue := NewDelayLkQueue[TValue, TKey]() + q.m.Lock() q.queues[route] = queue + q.m.Unlock() queue.CancellableDelayEnqueue(key, value, duration) } } // CancelDelayEnqueue 取消延迟入队 func (q *Queues[TValue, TKey, TRoute]) CancelDelayEnqueue(route TRoute, key TKey) { + q.m.RLock() + defer q.m.RUnlock() if queue, ok := q.queues[route]; ok { queue.CancelDelayEnqueue(key) } @@ -65,6 +85,8 @@ func (q *Queues[TValue, TKey, TRoute]) CancelDelayEnqueue(route TRoute, key TKey // ContinuousDequeue 持续监听出队 func (q *Queues[TValue, TKey, TRoute]) ContinuousDequeue(route TRoute) iter.Seq[TValue] { + q.m.RLock() + defer q.m.RUnlock() if queue, ok := q.queues[route]; ok { return queue.ContinuousDequeue() } @@ -73,6 +95,8 @@ func (q *Queues[TValue, TKey, TRoute]) ContinuousDequeue(route TRoute) iter.Seq[ // ContinuousDequeueExecute 持续监听出队执行函数 func (q *Queues[TValue, TKey, TRoute]) ContinuousDequeueExecute(route TRoute, fn func(TValue)) { + q.m.RLock() + defer q.m.RUnlock() if queue, ok := q.queues[route]; ok { queue.ContinuousDequeueExecute(fn) } @@ -80,6 +104,8 @@ func (q *Queues[TValue, TKey, TRoute]) ContinuousDequeueExecute(route TRoute, fn // ContinuousDequeueNotify 持续监听出队通知 func (q *Queues[TValue, TKey, TRoute]) ContinuousDequeueNotify(route TRoute, chs ...chan TValue) { + q.m.RLock() + defer q.m.RUnlock() if queue, ok := q.queues[route]; ok { queue.ContinuousDequeueNotify(chs...) }