package lock_free import ( "iter" "time" ) // Queues 队列集合 type Queues[TValue any, TKey, TRoute comparable] struct { queues map[TRoute]*DelayLkQueue[TValue, TKey] } // NewQueues 创建队列集合 func NewQueues[TValue any, TKey, TRoute comparable]() *Queues[TValue, TKey, TRoute] { return &Queues[TValue, TKey, TRoute]{make(map[TRoute]*DelayLkQueue[TValue, TKey])} } // Enqueue 入队 func (q *Queues[TValue, TKey, TRoute]) Enqueue(route TRoute, value TValue) { if queue, ok := q.queues[route]; ok { queue.Enqueue(value) } else { queue := NewDelayLkQueue[TValue, TKey]() q.queues[route] = queue queue.Enqueue(value) } } // Dequeue 出队 func (q *Queues[TValue, TKey, TRoute]) Dequeue(route TRoute) (value TValue, ok bool) { if queue, ok := q.queues[route]; ok { return queue.Dequeue() } return value, false } // DelayEnqueue 延迟入队 func (q *Queues[TValue, TKey, TRoute]) DelayEnqueue(route TRoute, value TValue, duration time.Duration) { if queue, ok := q.queues[route]; ok { queue.DelayEnqueue(value, duration) } else { queue := NewDelayLkQueue[TValue, TKey]() q.queues[route] = queue queue.DelayEnqueue(value, duration) } } // CancellableDelayEnqueue 可取消的延迟入队 func (q *Queues[TValue, TKey, TRoute]) CancellableDelayEnqueue(route TRoute, key TKey, value TValue, duration time.Duration) { if queue, ok := q.queues[route]; ok { queue.CancellableDelayEnqueue(key, value, duration) } else { queue := NewDelayLkQueue[TValue, TKey]() q.queues[route] = queue queue.CancellableDelayEnqueue(key, value, duration) } } // CancelDelayEnqueue 取消延迟入队 func (q *Queues[TValue, TKey, TRoute]) CancelDelayEnqueue(route TRoute, key TKey) { if queue, ok := q.queues[route]; ok { queue.CancelDelayEnqueue(key) } } // ContinuousDequeue 持续监听出队 func (q *Queues[TValue, TKey, TRoute]) ContinuousDequeue(route TRoute) iter.Seq[TValue] { if queue, ok := q.queues[route]; ok { return queue.ContinuousDequeue() } return nil } // ContinuousDequeueExecute 持续监听出队执行函数 func (q *Queues[TValue, TKey, TRoute]) ContinuousDequeueExecute(route TRoute, fn func(TValue)) { if queue, ok := q.queues[route]; ok { queue.ContinuousDequeueExecute(fn) } } // ContinuousDequeueNotify 持续监听出队通知 func (q *Queues[TValue, TKey, TRoute]) ContinuousDequeueNotify(route TRoute, chs ...chan TValue) { if queue, ok := q.queues[route]; ok { queue.ContinuousDequeueNotify(chs...) } }