go-study/lock_free/queues.go

192 lines
5.5 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package lock_free
import (
"iter"
"sync"
"time"
)
// Queues 队列集合
type Queues[TKey comparable, TValue, TRoute any] struct {
queues sync.Map // 读多写少的场景,适合用 sync.Map 路由队列
ack sync.Map // 存储队列确认channel
msgs sync.Map // 存储消费消息用于ack失败时重新入队
autoAck bool // 是否自动确认
expireAutoFail time.Duration // 手动确认截止时间
}
// NewQueues 创建队列集合
func NewQueues[TKey comparable, TValue, TRoute any](opt ...Option) *Queues[TKey, TValue, TRoute] {
opts := loadOptions(opt...)
return &Queues[TKey, TValue, TRoute]{autoAck: opts.autoAck, expireAutoFail: opts.expireAutoFail}
}
// SetAutoAck 设置自动确认
func (q *Queues[TKey, TValue, TRoute]) SetAutoAck(autoAck bool) {
q.autoAck = autoAck
}
// SetExpireAutoFail 设置手动确认截止时间
func (q *Queues[TKey, TValue, TRoute]) SetExpireAutoFail(expireAutoFail time.Duration) {
q.expireAutoFail = expireAutoFail
}
// checkAckStatus 检查确认状态
func (q *Queues[TKey, TValue, TRoute]) checkAckStatus(route TRoute) {
if ack, ok := q.ack.Load(route); ok && !q.autoAck {
if status, exist := <-ack.(chan bool); !status && exist {
if msg, ok := q.msgs.LoadAndDelete(route); ok {
// 重新入队
q.Enqueue(route, msg.(TValue))
}
}
}
}
// Enqueue 入队
func (q *Queues[TKey, TValue, TRoute]) Enqueue(route TRoute, value TValue) {
// 入队时检查上一次出队是否确认成功
q.checkAckStatus(route)
if queue, ok := q.queues.Load(route); ok {
queue.(*DelayLkQueue[TKey, TValue]).Enqueue(value)
} else {
queue := NewDelayLkQueue[TKey, TValue]()
q.queues.Store(route, queue)
queue.Enqueue(value)
}
}
// makeAckData 生成确认数据,并持续监听
func (q *Queues[TKey, TValue, TRoute]) makeAckData(route TRoute, value TValue, valid bool) {
if !q.autoAck && valid {
var ack chan bool
// ack数据只需生成一次
if a, ok := q.ack.Load(route); !ok {
ack = make(chan bool, 1)
q.ack.Store(route, ack)
} else {
ack = a.(chan bool)
}
q.msgs.Store(route, value)
// todo 可考虑使用协程池
go func() {
if queue, ok := q.queues.Load(route); ok {
select {
case status := <-ack:
if !status {
// 重新入队
queue.(*DelayLkQueue[TKey, TValue]).Enqueue(value)
}
case <-time.After(q.expireAutoFail):
// 重新入队
queue.(*DelayLkQueue[TKey, TValue]).Enqueue(value)
}
}
}()
}
}
// Dequeue 出队
func (q *Queues[TKey, TValue, TRoute]) Dequeue(route TRoute) (value TValue, valid bool) {
if queue, ok := q.queues.Load(route); ok {
value, valid = queue.(*DelayLkQueue[TKey, TValue]).Dequeue()
q.makeAckData(route, value, valid)
return value, valid
}
return value, false
}
// Ack 确认消息
func (q *Queues[TKey, TValue, TRoute]) Ack(route TRoute, status bool) {
if ack, ok := q.ack.Load(route); ok && !q.autoAck {
ack.(chan bool) <- status
}
}
// Len 队列长度
func (q *Queues[TKey, TValue, TRoute]) Len(route TRoute) int {
if queue, ok := q.queues.Load(route); ok {
return queue.(*DelayLkQueue[TKey, TValue]).Len()
}
return 0
}
// DelayCount 获取延时队列数量
func (q *Queues[TKey, TValue, TRoute]) DelayCount(route TRoute) uint64 {
if queue, ok := q.queues.Load(route); ok {
return queue.(*DelayLkQueue[TKey, TValue]).DelayCount()
}
return 0
}
// DelayEnqueue 延迟入队
func (q *Queues[TKey, TValue, TRoute]) DelayEnqueue(route TRoute, value TValue, duration time.Duration) {
// 入队时检查上一次出队是否确认成功
q.checkAckStatus(route)
if queue, ok := q.queues.Load(route); ok {
queue.(*DelayLkQueue[TKey, TValue]).DelayEnqueue(value, duration)
} else {
queue := NewDelayLkQueue[TKey, TValue]()
q.queues.Store(route, queue)
queue.DelayEnqueue(value, duration)
}
}
// CancellableDelayEnqueue 可取消的延迟入队
func (q *Queues[TKey, TValue, TRoute]) CancellableDelayEnqueue(route TRoute, key TKey, value TValue, duration time.Duration) {
// 入队时检查上一次出队是否确认成功
q.checkAckStatus(route)
if queue, ok := q.queues.Load(route); ok {
queue.(*DelayLkQueue[TKey, TValue]).CancellableDelayEnqueue(key, value, duration)
} else {
queue := NewDelayLkQueue[TKey, TValue]()
q.queues.Store(route, queue)
queue.CancellableDelayEnqueue(key, value, duration)
}
}
// CancelDelayEnqueue 取消延迟入队
func (q *Queues[TKey, TValue, TRoute]) CancelDelayEnqueue(route TRoute, key TKey) {
if queue, ok := q.queues.Load(route); ok {
queue.(*DelayLkQueue[TKey, TValue]).CancelDelayEnqueue(key)
}
}
// ContinuousDequeue 持续监听出队
func (q *Queues[TKey, TValue, TRoute]) ContinuousDequeue(route TRoute) iter.Seq[TValue] {
if queue, ok := q.queues.Load(route); ok {
// 因需增加确认状态,故重新封装
return func(yield func(TValue) bool) {
for value := range queue.(*DelayLkQueue[TKey, TValue]).ContinuousDequeue() {
q.makeAckData(route, value, true)
if !yield(value) {
return
}
}
}
}
return nil
}
// ContinuousDequeueExecute 持续监听出队执行函数
func (q *Queues[TKey, TValue, TRoute]) ContinuousDequeueExecute(route TRoute, fn func(TValue)) {
for value := range q.ContinuousDequeue(route) {
fn(value)
}
}
// ContinuousDequeueNotify 持续监听出队通知
func (q *Queues[TKey, TValue, TRoute]) ContinuousDequeueNotify(route TRoute, chs ...chan TValue) {
q.ContinuousDequeueExecute(route, func(value TValue) {
for _, ch := range chs {
ch <- value
}
})
}