package lock_free import ( "iter" "sync" "time" ) // Queues 队列集合 type Queues[TKey comparable, TValue, TRoute any] struct { queues sync.Map // 读多写少的场景,适合用 sync.Map 路由队列 } // NewQueues 创建队列集合 func NewQueues[TKey comparable, TValue, TRoute any]() *Queues[TKey, TValue, TRoute] { return &Queues[TKey, TValue, TRoute]{} } // Enqueue 入队 func (q *Queues[TKey, TValue, TRoute]) Enqueue(route TRoute, value TValue) { if queue, ok := q.queues.Load(route); ok { queue.(*DelayLkQueue[TKey, TValue]).Enqueue(value) } else { queue := NewDelayLkQueue[TKey, TValue]() q.queues.Store(route, queue) queue.Enqueue(value) } } // Dequeue 出队 func (q *Queues[TKey, TValue, TRoute]) Dequeue(route TRoute) (value TValue, ok bool) { if queue, ok := q.queues.Load(route); ok { return queue.(*DelayLkQueue[TKey, TValue]).Dequeue() } return value, false } // Len 队列长度 func (q *Queues[TKey, TValue, TRoute]) Len(route TRoute) int { if queue, ok := q.queues.Load(route); ok { return queue.(*DelayLkQueue[TKey, TValue]).Len() } return 0 } // DelayEnqueue 延迟入队 func (q *Queues[TKey, TValue, TRoute]) DelayEnqueue(route TRoute, value TValue, duration time.Duration) { if queue, ok := q.queues.Load(route); ok { queue.(*DelayLkQueue[TKey, TValue]).DelayEnqueue(value, duration) } else { queue := NewDelayLkQueue[TKey, TValue]() q.queues.Store(route, queue) queue.DelayEnqueue(value, duration) } } // CancellableDelayEnqueue 可取消的延迟入队 func (q *Queues[TKey, TValue, TRoute]) CancellableDelayEnqueue(route TRoute, key TKey, value TValue, duration time.Duration) { if queue, ok := q.queues.Load(route); ok { queue.(*DelayLkQueue[TKey, TValue]).CancellableDelayEnqueue(key, value, duration) } else { queue := NewDelayLkQueue[TKey, TValue]() q.queues.Store(route, queue) queue.CancellableDelayEnqueue(key, value, duration) } } // CancelDelayEnqueue 取消延迟入队 func (q *Queues[TKey, TValue, TRoute]) CancelDelayEnqueue(route TRoute, key TKey) { if queue, ok := q.queues.Load(route); ok { queue.(*DelayLkQueue[TKey, TValue]).CancelDelayEnqueue(key) } } // ContinuousDequeue 持续监听出队 func (q *Queues[TKey, TValue, TRoute]) ContinuousDequeue(route TRoute) iter.Seq[TValue] { if queue, ok := q.queues.Load(route); ok { return queue.(*DelayLkQueue[TKey, TValue]).ContinuousDequeue() } return nil } // ContinuousDequeueExecute 持续监听出队执行函数 func (q *Queues[TKey, TValue, TRoute]) ContinuousDequeueExecute(route TRoute, fn func(TValue)) { if queue, ok := q.queues.Load(route); ok { queue.(*DelayLkQueue[TKey, TValue]).ContinuousDequeueExecute(fn) } } // ContinuousDequeueNotify 持续监听出队通知 func (q *Queues[TKey, TValue, TRoute]) ContinuousDequeueNotify(route TRoute, chs ...chan TValue) { if queue, ok := q.queues.Load(route); ok { queue.(*DelayLkQueue[TKey, TValue]).ContinuousDequeueNotify(chs...) } }