延时队列增加数量获取功能

This commit is contained in:
fantasticbin 2024-12-10 17:38:01 +08:00
parent 49f4d119ec
commit 28086ea8ee

View File

@ -3,23 +3,37 @@ package lock_free
import ( import (
"iter" "iter"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
type DelayLkQueue[TKey comparable, TValue any] struct { type DelayLkQueue[TKey comparable, TValue any] struct {
timers map[TKey]*time.Timer timers map[TKey]*time.Timer
m sync.Mutex delayCount atomic.Uint64 // 用于统计延时队列数量
m sync.Mutex
LkQueue[TValue] LkQueue[TValue]
} }
// NewDelayLkQueue 创建延迟无锁队列 // NewDelayLkQueue 创建延迟无锁队列
func NewDelayLkQueue[TKey comparable, TValue any]() *DelayLkQueue[TKey, TValue] { func NewDelayLkQueue[TKey comparable, TValue any]() *DelayLkQueue[TKey, TValue] {
return &DelayLkQueue[TKey, TValue]{make(map[TKey]*time.Timer), sync.Mutex{}, *NewLkQueue[TValue]()} return &DelayLkQueue[TKey, TValue]{
make(map[TKey]*time.Timer),
atomic.Uint64{},
sync.Mutex{},
*NewLkQueue[TValue](),
}
}
// DelayCount 获取延时队列数量
func (q *DelayLkQueue[TKey, TValue]) DelayCount() uint64 {
return q.delayCount.Load()
} }
// DelayEnqueue 延迟入队 // DelayEnqueue 延迟入队
func (q *DelayLkQueue[TKey, TValue]) DelayEnqueue(value TValue, duration time.Duration) { func (q *DelayLkQueue[TKey, TValue]) DelayEnqueue(value TValue, duration time.Duration) {
q.delayCount.Add(1)
time.AfterFunc(duration, func() { time.AfterFunc(duration, func() {
q.delayCount.Add(^uint64(0))
q.Enqueue(value) q.Enqueue(value)
}) })
} }
@ -31,7 +45,9 @@ func (q *DelayLkQueue[TKey, TValue]) CancellableDelayEnqueue(key TKey, value TVa
if timer, ok := q.timers[key]; ok { if timer, ok := q.timers[key]; ok {
timer.Stop() timer.Stop()
} }
q.delayCount.Add(1)
q.timers[key] = time.AfterFunc(duration, func() { q.timers[key] = time.AfterFunc(duration, func() {
q.delayCount.Add(^uint64(0))
delete(q.timers, key) delete(q.timers, key)
q.Enqueue(value) q.Enqueue(value)
}) })
@ -42,6 +58,7 @@ func (q *DelayLkQueue[TKey, TValue]) CancelDelayEnqueue(key TKey) {
q.m.Lock() q.m.Lock()
defer q.m.Unlock() defer q.m.Unlock()
if timer, ok := q.timers[key]; ok { if timer, ok := q.timers[key]; ok {
q.delayCount.Add(^uint64(0))
delete(q.timers, key) delete(q.timers, key)
timer.Stop() timer.Stop()
} }