From 1647f91ba44e318ce338d6e412b22d8ce4cef750 Mon Sep 17 00:00:00 2001 From: fantasticbin Date: Tue, 10 Dec 2024 12:51:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E9=98=9F=E5=88=97=E9=9B=86?= =?UTF-8?q?=E5=90=88=E7=9A=84ACK=E7=A1=AE=E8=AE=A4=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lock_free/option.go | 35 +++++++++++ lock_free/queues.go | 112 +++++++++++++++++++++++++++++++---- lock_free/queues_ack_test.go | 38 ++++++++++++ 3 files changed, 174 insertions(+), 11 deletions(-) create mode 100644 lock_free/option.go create mode 100644 lock_free/queues_ack_test.go diff --git a/lock_free/option.go b/lock_free/option.go new file mode 100644 index 0000000..129efaa --- /dev/null +++ b/lock_free/option.go @@ -0,0 +1,35 @@ +package lock_free + +import "time" + +type Option func(*options) + +type options struct { + autoAck bool + expireAutoFail time.Duration +} + +func loadOptions(opt ...Option) options { + opts := options{ + autoAck: true, + expireAutoFail: 10 * time.Second, + } + for _, o := range opt { + o(&opts) + } + return opts +} + +// WithAutoAck 设置自动确认 +func WithAutoAck(autoAck bool) Option { + return func(o *options) { + o.autoAck = autoAck + } +} + +// WithExpireAutoFail 设置手动确认截止时间 +func WithExpireAutoFail(expireAutoFail time.Duration) Option { + return func(o *options) { + o.expireAutoFail = expireAutoFail + } +} diff --git a/lock_free/queues.go b/lock_free/queues.go index 1ba6372..5db1ff8 100644 --- a/lock_free/queues.go +++ b/lock_free/queues.go @@ -1,6 +1,7 @@ package lock_free import ( + "fmt" "iter" "sync" "time" @@ -8,16 +9,46 @@ import ( // Queues 队列集合 type Queues[TKey comparable, TValue, TRoute any] struct { - queues sync.Map // 读多写少的场景,适合用 sync.Map 路由队列 + queues sync.Map // 读多写少的场景,适合用 sync.Map 路由队列 + ack sync.Map // 存储队列确认channel + msgs sync.Map // 存储消费消息,用于ack失败时重新入队 + autoAck bool // 是否自动确认 + expireAutoFail time.Duration // 手动确认截止时间 } // NewQueues 创建队列集合 -func NewQueues[TKey comparable, TValue, TRoute any]() *Queues[TKey, TValue, TRoute] { - return &Queues[TKey, TValue, TRoute]{} +func NewQueues[TKey comparable, TValue, TRoute any](opt ...Option) *Queues[TKey, TValue, TRoute] { + opts := loadOptions(opt...) + return &Queues[TKey, TValue, TRoute]{autoAck: opts.autoAck, expireAutoFail: opts.expireAutoFail} +} + +// SetAutoAck 设置自动确认 +func (q *Queues[TKey, TValue, TRoute]) SetAutoAck(autoAck bool) { + q.autoAck = autoAck +} + +// SetExpireAutoFail 设置手动确认截止时间 +func (q *Queues[TKey, TValue, TRoute]) SetExpireAutoFail(expireAutoFail time.Duration) { + q.expireAutoFail = expireAutoFail +} + +// checkAckStatus 检查确认状态 +func (q *Queues[TKey, TValue, TRoute]) checkAckStatus(route TRoute) { + if ack, ok := q.ack.Load(route); ok && !q.autoAck { + if status, exist := <-ack.(chan bool); !status && exist { + if msg, ok := q.msgs.Load(route); ok { + // 重新入队 + q.Enqueue(route, msg.(TValue)) + } + } + } } // Enqueue 入队 func (q *Queues[TKey, TValue, TRoute]) Enqueue(route TRoute, value TValue) { + // 入队时检查上一次出队是否确认成功 + q.checkAckStatus(route) + if queue, ok := q.queues.Load(route); ok { queue.(*DelayLkQueue[TKey, TValue]).Enqueue(value) } else { @@ -27,14 +58,57 @@ func (q *Queues[TKey, TValue, TRoute]) Enqueue(route TRoute, value TValue) { } } +// makeAckData 生成确认数据,并持续监听 +func (q *Queues[TKey, TValue, TRoute]) makeAckData(route TRoute, value TValue, valid bool) { + if !q.autoAck && valid { + var ack chan bool + // ack数据只需生成一次 + if a, ok := q.ack.Load(route); !ok { + ack = make(chan bool, 1) + q.ack.Store(route, ack) + } else { + ack = a.(chan bool) + } + + q.msgs.Store(route, value) + + // todo 可考虑使用协程池 + go func() { + if queue, ok := q.queues.Load(route); ok { + select { + case status := <-ack: + if !status { + // 重新入队 + queue.(*DelayLkQueue[TKey, TValue]).Enqueue(value) + } + + case <-time.After(q.expireAutoFail): + // 重新入队 + fmt.Println("ack timeout") + queue.(*DelayLkQueue[TKey, TValue]).Enqueue(value) + } + } + }() + } +} + // Dequeue 出队 -func (q *Queues[TKey, TValue, TRoute]) Dequeue(route TRoute) (value TValue, ok bool) { +func (q *Queues[TKey, TValue, TRoute]) Dequeue(route TRoute) (value TValue, valid bool) { if queue, ok := q.queues.Load(route); ok { - return queue.(*DelayLkQueue[TKey, TValue]).Dequeue() + value, valid = queue.(*DelayLkQueue[TKey, TValue]).Dequeue() + q.makeAckData(route, value, valid) + return value, valid } return value, false } +// Ack 确认消息 +func (q *Queues[TKey, TValue, TRoute]) Ack(route TRoute, status bool) { + if ack, ok := q.ack.Load(route); ok && !q.autoAck { + ack.(chan bool) <- status + } +} + // Len 队列长度 func (q *Queues[TKey, TValue, TRoute]) Len(route TRoute) int { if queue, ok := q.queues.Load(route); ok { @@ -45,6 +119,9 @@ func (q *Queues[TKey, TValue, TRoute]) Len(route TRoute) int { // DelayEnqueue 延迟入队 func (q *Queues[TKey, TValue, TRoute]) DelayEnqueue(route TRoute, value TValue, duration time.Duration) { + // 入队时检查上一次出队是否确认成功 + q.checkAckStatus(route) + if queue, ok := q.queues.Load(route); ok { queue.(*DelayLkQueue[TKey, TValue]).DelayEnqueue(value, duration) } else { @@ -56,6 +133,9 @@ func (q *Queues[TKey, TValue, TRoute]) DelayEnqueue(route TRoute, value TValue, // CancellableDelayEnqueue 可取消的延迟入队 func (q *Queues[TKey, TValue, TRoute]) CancellableDelayEnqueue(route TRoute, key TKey, value TValue, duration time.Duration) { + // 入队时检查上一次出队是否确认成功 + q.checkAckStatus(route) + if queue, ok := q.queues.Load(route); ok { queue.(*DelayLkQueue[TKey, TValue]).CancellableDelayEnqueue(key, value, duration) } else { @@ -75,21 +155,31 @@ func (q *Queues[TKey, TValue, TRoute]) CancelDelayEnqueue(route TRoute, key TKey // ContinuousDequeue 持续监听出队 func (q *Queues[TKey, TValue, TRoute]) ContinuousDequeue(route TRoute) iter.Seq[TValue] { if queue, ok := q.queues.Load(route); ok { - return queue.(*DelayLkQueue[TKey, TValue]).ContinuousDequeue() + // 因需增加确认状态,故重新封装 + return func(yield func(TValue) bool) { + for value := range queue.(*DelayLkQueue[TKey, TValue]).ContinuousDequeue() { + q.makeAckData(route, value, true) + if !yield(value) { + return + } + } + } } return nil } // ContinuousDequeueExecute 持续监听出队执行函数 func (q *Queues[TKey, TValue, TRoute]) ContinuousDequeueExecute(route TRoute, fn func(TValue)) { - if queue, ok := q.queues.Load(route); ok { - queue.(*DelayLkQueue[TKey, TValue]).ContinuousDequeueExecute(fn) + for value := range q.ContinuousDequeue(route) { + fn(value) } } // ContinuousDequeueNotify 持续监听出队通知 func (q *Queues[TKey, TValue, TRoute]) ContinuousDequeueNotify(route TRoute, chs ...chan TValue) { - if queue, ok := q.queues.Load(route); ok { - queue.(*DelayLkQueue[TKey, TValue]).ContinuousDequeueNotify(chs...) - } + q.ContinuousDequeueExecute(route, func(value TValue) { + for _, ch := range chs { + ch <- value + } + }) } diff --git a/lock_free/queues_ack_test.go b/lock_free/queues_ack_test.go new file mode 100644 index 0000000..04cc4e3 --- /dev/null +++ b/lock_free/queues_ack_test.go @@ -0,0 +1,38 @@ +package lock_free + +import ( + "testing" + "time" +) + +func TestQueuesAck(t *testing.T) { + cases := []int{1, 2, 3} + route := "test_ack" + q := NewQueues[struct{}, int, string]( + WithAutoAck(false), + WithExpireAutoFail(time.Second), + ) + + for _, c := range cases { + q.Enqueue(route, c) + } + + for range cases { + q.Dequeue(route) + } + + time.Sleep(2 * time.Second) + + if q.Len(route) != len(cases) { + t.Errorf("queue length error, want %d, got %d", len(cases), q.Len(route)) + } + + for range cases { + q.Dequeue(route) + q.Ack(route, true) + } + + if q.Len(route) != 0 { + t.Errorf("queue length error, want %d, got %d", 0, q.Len(route)) + } +}