package lock_free import ( "iter" "sync" "time" ) // Queues 队列集合 type Queues[TValue any, TKey, TRoute comparable] struct { queues map[TRoute]*DelayLkQueue[TValue, TKey] m sync.RWMutex } // NewQueues 创建队列集合 func NewQueues[TValue any, TKey, TRoute comparable]() *Queues[TValue, TKey, TRoute] { return &Queues[TValue, TKey, TRoute]{make(map[TRoute]*DelayLkQueue[TValue, TKey]), sync.RWMutex{}} } // Enqueue 入队 func (q *Queues[TValue, TKey, TRoute]) Enqueue(route TRoute, value TValue) { q.m.RLock() if queue, ok := q.queues[route]; ok { q.m.RUnlock() queue.Enqueue(value) } else { q.m.RUnlock() queue := NewDelayLkQueue[TValue, TKey]() q.m.Lock() q.queues[route] = queue q.m.Unlock() queue.Enqueue(value) } } // Dequeue 出队 func (q *Queues[TValue, TKey, TRoute]) Dequeue(route TRoute) (value TValue, ok bool) { q.m.RLock() defer q.m.RUnlock() if queue, ok := q.queues[route]; ok { return queue.Dequeue() } return value, false } // DelayEnqueue 延迟入队 func (q *Queues[TValue, TKey, TRoute]) DelayEnqueue(route TRoute, value TValue, duration time.Duration) { q.m.RLock() if queue, ok := q.queues[route]; ok { q.m.RUnlock() queue.DelayEnqueue(value, duration) } else { q.m.RUnlock() queue := NewDelayLkQueue[TValue, TKey]() q.m.Lock() q.queues[route] = queue q.m.Unlock() queue.DelayEnqueue(value, duration) } } // CancellableDelayEnqueue 可取消的延迟入队 func (q *Queues[TValue, TKey, TRoute]) CancellableDelayEnqueue(route TRoute, key TKey, value TValue, duration time.Duration) { q.m.RLock() defer q.m.RUnlock() if queue, ok := q.queues[route]; ok { queue.CancellableDelayEnqueue(key, value, duration) } else { queue := NewDelayLkQueue[TValue, TKey]() q.m.Lock() q.queues[route] = queue q.m.Unlock() queue.CancellableDelayEnqueue(key, value, duration) } } // CancelDelayEnqueue 取消延迟入队 func (q *Queues[TValue, TKey, TRoute]) CancelDelayEnqueue(route TRoute, key TKey) { q.m.RLock() defer q.m.RUnlock() if queue, ok := q.queues[route]; ok { queue.CancelDelayEnqueue(key) } } // ContinuousDequeue 持续监听出队 func (q *Queues[TValue, TKey, TRoute]) ContinuousDequeue(route TRoute) iter.Seq[TValue] { q.m.RLock() defer q.m.RUnlock() if queue, ok := q.queues[route]; ok { return queue.ContinuousDequeue() } return nil } // ContinuousDequeueExecute 持续监听出队执行函数 func (q *Queues[TValue, TKey, TRoute]) ContinuousDequeueExecute(route TRoute, fn func(TValue)) { q.m.RLock() defer q.m.RUnlock() if queue, ok := q.queues[route]; ok { queue.ContinuousDequeueExecute(fn) } } // ContinuousDequeueNotify 持续监听出队通知 func (q *Queues[TValue, TKey, TRoute]) ContinuousDequeueNotify(route TRoute, chs ...chan TValue) { q.m.RLock() defer q.m.RUnlock() if queue, ok := q.queues[route]; ok { queue.ContinuousDequeueNotify(chs...) } }