go-study/lock_free/queues.go
fantasticbin 5b48ea1a62 使用codex优化代码,具体如下:
主要优化

  - err_group:去掉多余 goroutine,避免潜在泄漏;并把并发 append 改为按下标写
    入,消除数据竞争。
  - err_group 测试稳定性增强:放宽超时并增加结果长度断言。
  - semaphore:修复等待队列元素类型断言错误(*waiter);补充非法参数校验(负数
    acquire/release)。
  - SemaChan:修复 Lock/Unlock 逻辑(初始化令牌桶),避免永久阻塞。
  - observer:修复“每次 Notify 都启动新 fanout 协程”的问题:改为 sync.Once 只启动一次
    fanOut。
  - observer:修复并发读写观察者列表问题:给 Attach/Detach/fanOut 增加读写锁保护。
  - observer:去掉 fanout 内部额外再起 goroutine和自动关闭所有 observer 的行为,避
    免重复关闭/竞态风险(仍保留 Detach 时关闭单个 observer)。
  - lock_free:修复可取消延迟队列的计数错误与 timers map 并发访问问题。
  - lock_free:checkAckStatus 改为非阻塞读取,避免入队路径被卡住。
  - routine:提供默认空任务并忽略 nil taskFn,防止空指针调用。
  - ticker:发送改为非阻塞,Stop 幂等化,降低阻塞和重复关闭风险。
  - query_builder:WaitAndGo 增加 goroutine 内 panic 转 error;测试里
    的 GORM filter 链式写法修正。

  新增测试

  - 新增 semaphore 测试,覆盖 Acquire/Release/TryAcquire 与 SemaChan 并发上限。
2026-03-05 21:53:11 +08:00

202 lines
5.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package lock_free
import (
"iter"
"sync"
"time"
)
// Queues 队列集合
type Queues[TKey comparable, TValue, TRoute any] struct {
queues sync.Map // 读多写少的场景,适合用 sync.Map 路由队列
ack sync.Map // 存储队列确认channel
msgs sync.Map // 存储消费消息用于ack失败时重新入队
autoAck bool // 是否自动确认
expireAutoFail time.Duration // 手动确认截止时间
}
// NewQueues 创建队列集合
func NewQueues[TKey comparable, TValue, TRoute any](opt ...Option) *Queues[TKey, TValue, TRoute] {
opts := loadOptions(opt...)
return &Queues[TKey, TValue, TRoute]{autoAck: opts.autoAck, expireAutoFail: opts.expireAutoFail}
}
// SetAutoAck 设置自动确认
func (q *Queues[TKey, TValue, TRoute]) SetAutoAck(autoAck bool) {
q.autoAck = autoAck
}
// SetExpireAutoFail 设置手动确认截止时间
func (q *Queues[TKey, TValue, TRoute]) SetExpireAutoFail(expireAutoFail time.Duration) {
q.expireAutoFail = expireAutoFail
}
// checkAckStatus 检查确认状态
func (q *Queues[TKey, TValue, TRoute]) checkAckStatus(route TRoute) {
if ack, ok := q.ack.Load(route); ok && !q.autoAck {
ackChan := ack.(chan bool)
select {
case status, exist := <-ackChan:
if !exist {
q.ack.Delete(route)
return
}
if !status {
if msg, loaded := q.msgs.LoadAndDelete(route); loaded {
// 重新入队
q.Enqueue(route, msg.(TValue))
}
}
default:
}
}
}
// Enqueue 入队
func (q *Queues[TKey, TValue, TRoute]) Enqueue(route TRoute, value TValue) {
// 入队时检查上一次出队是否确认成功
q.checkAckStatus(route)
if queue, ok := q.queues.Load(route); ok {
queue.(*DelayLkQueue[TKey, TValue]).Enqueue(value)
} else {
queue := NewDelayLkQueue[TKey, TValue]()
q.queues.Store(route, queue)
queue.Enqueue(value)
}
}
// makeAckData 生成确认数据,并持续监听
func (q *Queues[TKey, TValue, TRoute]) makeAckData(route TRoute, value TValue, valid bool) {
if !q.autoAck && valid {
var ack chan bool
// ack数据只需生成一次
if a, ok := q.ack.Load(route); !ok {
ack = make(chan bool, 1)
q.ack.Store(route, ack)
} else {
ack = a.(chan bool)
}
q.msgs.Store(route, value)
// todo 可考虑使用协程池
go func() {
if queue, ok := q.queues.Load(route); ok {
select {
case status := <-ack:
if !status {
// 重新入队
queue.(*DelayLkQueue[TKey, TValue]).Enqueue(value)
}
case <-time.After(q.expireAutoFail):
// 重新入队
queue.(*DelayLkQueue[TKey, TValue]).Enqueue(value)
}
}
}()
}
}
// Dequeue 出队
func (q *Queues[TKey, TValue, TRoute]) Dequeue(route TRoute) (value TValue, valid bool) {
if queue, ok := q.queues.Load(route); ok {
value, valid = queue.(*DelayLkQueue[TKey, TValue]).Dequeue()
q.makeAckData(route, value, valid)
return value, valid
}
return value, false
}
// Ack 确认消息
func (q *Queues[TKey, TValue, TRoute]) Ack(route TRoute, status bool) {
if ack, ok := q.ack.Load(route); ok && !q.autoAck {
ack.(chan bool) <- status
}
}
// Len 队列长度
func (q *Queues[TKey, TValue, TRoute]) Len(route TRoute) int {
if queue, ok := q.queues.Load(route); ok {
return queue.(*DelayLkQueue[TKey, TValue]).Len()
}
return 0
}
// DelayCount 获取延时队列数量
func (q *Queues[TKey, TValue, TRoute]) DelayCount(route TRoute) uint64 {
if queue, ok := q.queues.Load(route); ok {
return queue.(*DelayLkQueue[TKey, TValue]).DelayCount()
}
return 0
}
// DelayEnqueue 延迟入队
func (q *Queues[TKey, TValue, TRoute]) DelayEnqueue(route TRoute, value TValue, duration time.Duration) {
// 入队时检查上一次出队是否确认成功
q.checkAckStatus(route)
if queue, ok := q.queues.Load(route); ok {
queue.(*DelayLkQueue[TKey, TValue]).DelayEnqueue(value, duration)
} else {
queue := NewDelayLkQueue[TKey, TValue]()
q.queues.Store(route, queue)
queue.DelayEnqueue(value, duration)
}
}
// CancellableDelayEnqueue 可取消的延迟入队
func (q *Queues[TKey, TValue, TRoute]) CancellableDelayEnqueue(route TRoute, key TKey, value TValue, duration time.Duration) {
// 入队时检查上一次出队是否确认成功
q.checkAckStatus(route)
if queue, ok := q.queues.Load(route); ok {
queue.(*DelayLkQueue[TKey, TValue]).CancellableDelayEnqueue(key, value, duration)
} else {
queue := NewDelayLkQueue[TKey, TValue]()
q.queues.Store(route, queue)
queue.CancellableDelayEnqueue(key, value, duration)
}
}
// CancelDelayEnqueue 取消延迟入队
func (q *Queues[TKey, TValue, TRoute]) CancelDelayEnqueue(route TRoute, key TKey) {
if queue, ok := q.queues.Load(route); ok {
queue.(*DelayLkQueue[TKey, TValue]).CancelDelayEnqueue(key)
}
}
// ContinuousDequeue 持续监听出队
func (q *Queues[TKey, TValue, TRoute]) ContinuousDequeue(route TRoute) iter.Seq[TValue] {
if queue, ok := q.queues.Load(route); ok {
// 因需增加确认状态,故重新封装
return func(yield func(TValue) bool) {
for value := range queue.(*DelayLkQueue[TKey, TValue]).ContinuousDequeue() {
q.makeAckData(route, value, true)
if !yield(value) {
return
}
}
}
}
return nil
}
// ContinuousDequeueExecute 持续监听出队执行函数
func (q *Queues[TKey, TValue, TRoute]) ContinuousDequeueExecute(route TRoute, fn func(TValue)) {
for value := range q.ContinuousDequeue(route) {
fn(value)
}
}
// ContinuousDequeueNotify 持续监听出队通知
func (q *Queues[TKey, TValue, TRoute]) ContinuousDequeueNotify(route TRoute, chs ...chan TValue) {
q.ContinuousDequeueExecute(route, func(value TValue) {
for _, ch := range chs {
ch <- value
}
})
}