package lock_free import ( "iter" "sync" "time" ) // Queues 队列集合 type Queues[TKey comparable, TValue, TRoute any] struct { queues sync.Map // 读多写少的场景,适合用 sync.Map 路由队列 ack sync.Map // 存储队列确认channel msgs sync.Map // 存储消费消息,用于ack失败时重新入队 autoAck bool // 是否自动确认 expireAutoFail time.Duration // 手动确认截止时间 } // NewQueues 创建队列集合 func NewQueues[TKey comparable, TValue, TRoute any](opt ...Option) *Queues[TKey, TValue, TRoute] { opts := loadOptions(opt...) return &Queues[TKey, TValue, TRoute]{autoAck: opts.autoAck, expireAutoFail: opts.expireAutoFail} } // SetAutoAck 设置自动确认 func (q *Queues[TKey, TValue, TRoute]) SetAutoAck(autoAck bool) { q.autoAck = autoAck } // SetExpireAutoFail 设置手动确认截止时间 func (q *Queues[TKey, TValue, TRoute]) SetExpireAutoFail(expireAutoFail time.Duration) { q.expireAutoFail = expireAutoFail } // checkAckStatus 检查确认状态 func (q *Queues[TKey, TValue, TRoute]) checkAckStatus(route TRoute) { if ack, ok := q.ack.Load(route); ok && !q.autoAck { if status, exist := <-ack.(chan bool); !status && exist { if msg, ok := q.msgs.LoadAndDelete(route); ok { // 重新入队 q.Enqueue(route, msg.(TValue)) } } } } // Enqueue 入队 func (q *Queues[TKey, TValue, TRoute]) Enqueue(route TRoute, value TValue) { // 入队时检查上一次出队是否确认成功 q.checkAckStatus(route) if queue, ok := q.queues.Load(route); ok { queue.(*DelayLkQueue[TKey, TValue]).Enqueue(value) } else { queue := NewDelayLkQueue[TKey, TValue]() q.queues.Store(route, queue) queue.Enqueue(value) } } // makeAckData 生成确认数据,并持续监听 func (q *Queues[TKey, TValue, TRoute]) makeAckData(route TRoute, value TValue, valid bool) { if !q.autoAck && valid { var ack chan bool // ack数据只需生成一次 if a, ok := q.ack.Load(route); !ok { ack = make(chan bool, 1) q.ack.Store(route, ack) } else { ack = a.(chan bool) } q.msgs.Store(route, value) // todo 可考虑使用协程池 go func() { if queue, ok := q.queues.Load(route); ok { select { case status := <-ack: if !status { // 重新入队 queue.(*DelayLkQueue[TKey, TValue]).Enqueue(value) } case <-time.After(q.expireAutoFail): // 重新入队 queue.(*DelayLkQueue[TKey, TValue]).Enqueue(value) } } }() } } // Dequeue 出队 func (q *Queues[TKey, TValue, TRoute]) Dequeue(route TRoute) (value TValue, valid bool) { if queue, ok := q.queues.Load(route); ok { value, valid = queue.(*DelayLkQueue[TKey, TValue]).Dequeue() q.makeAckData(route, value, valid) return value, valid } return value, false } // Ack 确认消息 func (q *Queues[TKey, TValue, TRoute]) Ack(route TRoute, status bool) { if ack, ok := q.ack.Load(route); ok && !q.autoAck { ack.(chan bool) <- status } } // Len 队列长度 func (q *Queues[TKey, TValue, TRoute]) Len(route TRoute) int { if queue, ok := q.queues.Load(route); ok { return queue.(*DelayLkQueue[TKey, TValue]).Len() } return 0 } // DelayCount 获取延时队列数量 func (q *Queues[TKey, TValue, TRoute]) DelayCount(route TRoute) uint64 { if queue, ok := q.queues.Load(route); ok { return queue.(*DelayLkQueue[TKey, TValue]).DelayCount() } return 0 } // DelayEnqueue 延迟入队 func (q *Queues[TKey, TValue, TRoute]) DelayEnqueue(route TRoute, value TValue, duration time.Duration) { // 入队时检查上一次出队是否确认成功 q.checkAckStatus(route) if queue, ok := q.queues.Load(route); ok { queue.(*DelayLkQueue[TKey, TValue]).DelayEnqueue(value, duration) } else { queue := NewDelayLkQueue[TKey, TValue]() q.queues.Store(route, queue) queue.DelayEnqueue(value, duration) } } // CancellableDelayEnqueue 可取消的延迟入队 func (q *Queues[TKey, TValue, TRoute]) CancellableDelayEnqueue(route TRoute, key TKey, value TValue, duration time.Duration) { // 入队时检查上一次出队是否确认成功 q.checkAckStatus(route) if queue, ok := q.queues.Load(route); ok { queue.(*DelayLkQueue[TKey, TValue]).CancellableDelayEnqueue(key, value, duration) } else { queue := NewDelayLkQueue[TKey, TValue]() q.queues.Store(route, queue) queue.CancellableDelayEnqueue(key, value, duration) } } // CancelDelayEnqueue 取消延迟入队 func (q *Queues[TKey, TValue, TRoute]) CancelDelayEnqueue(route TRoute, key TKey) { if queue, ok := q.queues.Load(route); ok { queue.(*DelayLkQueue[TKey, TValue]).CancelDelayEnqueue(key) } } // ContinuousDequeue 持续监听出队 func (q *Queues[TKey, TValue, TRoute]) ContinuousDequeue(route TRoute) iter.Seq[TValue] { if queue, ok := q.queues.Load(route); ok { // 因需增加确认状态,故重新封装 return func(yield func(TValue) bool) { for value := range queue.(*DelayLkQueue[TKey, TValue]).ContinuousDequeue() { q.makeAckData(route, value, true) if !yield(value) { return } } } } return nil } // ContinuousDequeueExecute 持续监听出队执行函数 func (q *Queues[TKey, TValue, TRoute]) ContinuousDequeueExecute(route TRoute, fn func(TValue)) { for value := range q.ContinuousDequeue(route) { fn(value) } } // ContinuousDequeueNotify 持续监听出队通知 func (q *Queues[TKey, TValue, TRoute]) ContinuousDequeueNotify(route TRoute, chs ...chan TValue) { q.ContinuousDequeueExecute(route, func(value TValue) { for _, ch := range chs { ch <- value } }) }