From 28086ea8ee5c774546550f28c6689c549d3ffb9c Mon Sep 17 00:00:00 2001 From: fantasticbin Date: Tue, 10 Dec 2024 17:38:01 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BB=B6=E6=97=B6=E9=98=9F=E5=88=97=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E6=95=B0=E9=87=8F=E8=8E=B7=E5=8F=96=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lock_free/delay_queue.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/lock_free/delay_queue.go b/lock_free/delay_queue.go index 417340e..3a279e6 100644 --- a/lock_free/delay_queue.go +++ b/lock_free/delay_queue.go @@ -3,23 +3,37 @@ package lock_free import ( "iter" "sync" + "sync/atomic" "time" ) type DelayLkQueue[TKey comparable, TValue any] struct { - timers map[TKey]*time.Timer - m sync.Mutex + timers map[TKey]*time.Timer + delayCount atomic.Uint64 // 用于统计延时队列数量 + m sync.Mutex LkQueue[TValue] } // NewDelayLkQueue 创建延迟无锁队列 func NewDelayLkQueue[TKey comparable, TValue any]() *DelayLkQueue[TKey, TValue] { - return &DelayLkQueue[TKey, TValue]{make(map[TKey]*time.Timer), sync.Mutex{}, *NewLkQueue[TValue]()} + return &DelayLkQueue[TKey, TValue]{ + make(map[TKey]*time.Timer), + atomic.Uint64{}, + sync.Mutex{}, + *NewLkQueue[TValue](), + } +} + +// DelayCount 获取延时队列数量 +func (q *DelayLkQueue[TKey, TValue]) DelayCount() uint64 { + return q.delayCount.Load() } // DelayEnqueue 延迟入队 func (q *DelayLkQueue[TKey, TValue]) DelayEnqueue(value TValue, duration time.Duration) { + q.delayCount.Add(1) time.AfterFunc(duration, func() { + q.delayCount.Add(^uint64(0)) q.Enqueue(value) }) } @@ -31,7 +45,9 @@ func (q *DelayLkQueue[TKey, TValue]) CancellableDelayEnqueue(key TKey, value TVa if timer, ok := q.timers[key]; ok { timer.Stop() } + q.delayCount.Add(1) q.timers[key] = time.AfterFunc(duration, func() { + q.delayCount.Add(^uint64(0)) delete(q.timers, key) q.Enqueue(value) }) @@ -42,6 +58,7 @@ func (q *DelayLkQueue[TKey, TValue]) CancelDelayEnqueue(key TKey) { q.m.Lock() defer q.m.Unlock() if timer, ok := q.timers[key]; ok { + q.delayCount.Add(^uint64(0)) delete(q.timers, key) timer.Stop() }