diff --git a/lock_free/delay_queue.go b/lock_free/delay_queue.go index 417340e..3a279e6 100644 --- a/lock_free/delay_queue.go +++ b/lock_free/delay_queue.go @@ -3,23 +3,37 @@ package lock_free import ( "iter" "sync" + "sync/atomic" "time" ) type DelayLkQueue[TKey comparable, TValue any] struct { - timers map[TKey]*time.Timer - m sync.Mutex + timers map[TKey]*time.Timer + delayCount atomic.Uint64 // 用于统计延时队列数量 + m sync.Mutex LkQueue[TValue] } // NewDelayLkQueue 创建延迟无锁队列 func NewDelayLkQueue[TKey comparable, TValue any]() *DelayLkQueue[TKey, TValue] { - return &DelayLkQueue[TKey, TValue]{make(map[TKey]*time.Timer), sync.Mutex{}, *NewLkQueue[TValue]()} + return &DelayLkQueue[TKey, TValue]{ + make(map[TKey]*time.Timer), + atomic.Uint64{}, + sync.Mutex{}, + *NewLkQueue[TValue](), + } +} + +// DelayCount 获取延时队列数量 +func (q *DelayLkQueue[TKey, TValue]) DelayCount() uint64 { + return q.delayCount.Load() } // DelayEnqueue 延迟入队 func (q *DelayLkQueue[TKey, TValue]) DelayEnqueue(value TValue, duration time.Duration) { + q.delayCount.Add(1) time.AfterFunc(duration, func() { + q.delayCount.Add(^uint64(0)) q.Enqueue(value) }) } @@ -31,7 +45,9 @@ func (q *DelayLkQueue[TKey, TValue]) CancellableDelayEnqueue(key TKey, value TVa if timer, ok := q.timers[key]; ok { timer.Stop() } + q.delayCount.Add(1) q.timers[key] = time.AfterFunc(duration, func() { + q.delayCount.Add(^uint64(0)) delete(q.timers, key) q.Enqueue(value) }) @@ -42,6 +58,7 @@ func (q *DelayLkQueue[TKey, TValue]) CancelDelayEnqueue(key TKey) { q.m.Lock() defer q.m.Unlock() if timer, ok := q.timers[key]; ok { + q.delayCount.Add(^uint64(0)) delete(q.timers, key) timer.Stop() }