package lock_free import ( "iter" "sync" "sync/atomic" "time" ) type DelayLkQueue[TKey comparable, TValue any] struct { timers map[TKey]*time.Timer delayCount atomic.Uint64 // 用于统计延时队列数量 m sync.Mutex LkQueue[TValue] } // NewDelayLkQueue 创建延迟无锁队列 func NewDelayLkQueue[TKey comparable, TValue any]() *DelayLkQueue[TKey, TValue] { return &DelayLkQueue[TKey, TValue]{ make(map[TKey]*time.Timer), atomic.Uint64{}, sync.Mutex{}, *NewLkQueue[TValue](), } } // DelayCount 获取延时队列数量 func (q *DelayLkQueue[TKey, TValue]) DelayCount() uint64 { return q.delayCount.Load() } // DelayEnqueue 延迟入队 func (q *DelayLkQueue[TKey, TValue]) DelayEnqueue(value TValue, duration time.Duration) { q.delayCount.Add(1) time.AfterFunc(duration, func() { q.delayCount.Add(^uint64(0)) q.Enqueue(value) }) } // CancellableDelayEnqueue 可取消的延迟入队 func (q *DelayLkQueue[TKey, TValue]) CancellableDelayEnqueue(key TKey, value TValue, duration time.Duration) { q.m.Lock() defer q.m.Unlock() if timer, ok := q.timers[key]; ok { timer.Stop() } q.delayCount.Add(1) q.timers[key] = time.AfterFunc(duration, func() { q.delayCount.Add(^uint64(0)) delete(q.timers, key) q.Enqueue(value) }) } // CancelDelayEnqueue 取消延迟入队 func (q *DelayLkQueue[TKey, TValue]) CancelDelayEnqueue(key TKey) { q.m.Lock() defer q.m.Unlock() if timer, ok := q.timers[key]; ok { q.delayCount.Add(^uint64(0)) delete(q.timers, key) timer.Stop() } } // ContinuousDequeue 持续监听出队 func (q *DelayLkQueue[TKey, TValue]) ContinuousDequeue() iter.Seq[TValue] { return func(yield func(TValue) bool) { for { if value, ok := q.Dequeue(); ok { if !yield(value) { return } } else { time.Sleep(time.Millisecond) // 队列为空,休眠1毫秒 } } } } // ContinuousDequeueExecute 持续监听出队执行函数 func (q *DelayLkQueue[TKey, TValue]) ContinuousDequeueExecute(fn func(TValue)) { for value := range q.ContinuousDequeue() { fn(value) } } // ContinuousDequeueNotify 持续监听出队通知 func (q *DelayLkQueue[TKey, TValue]) ContinuousDequeueNotify(chs ...chan TValue) { q.ContinuousDequeueExecute(func(value TValue) { for _, ch := range chs { ch <- value } }) }