diff --git a/lock_free/delay_queue.go b/lock_free/delay_queue.go index b7f5394..50328c9 100644 --- a/lock_free/delay_queue.go +++ b/lock_free/delay_queue.go @@ -6,26 +6,26 @@ import ( "time" ) -type DelayLkQueue[TValue any, TKey comparable] struct { +type DelayLkQueue[TKey comparable, TValue any] struct { timers map[TKey]*time.Timer m sync.Mutex LkQueue[TValue] } // NewDelayLkQueue 创建延迟无锁队列 -func NewDelayLkQueue[TValue any, TKey comparable]() *DelayLkQueue[TValue, TKey] { - return &DelayLkQueue[TValue, TKey]{make(map[TKey]*time.Timer), sync.Mutex{}, *NewLkQueue[TValue]()} +func NewDelayLkQueue[TKey comparable, TValue any]() *DelayLkQueue[TKey, TValue] { + return &DelayLkQueue[TKey, TValue]{make(map[TKey]*time.Timer), sync.Mutex{}, *NewLkQueue[TValue]()} } // DelayEnqueue 延迟入队 -func (q *DelayLkQueue[TValue, TKey]) DelayEnqueue(value TValue, duration time.Duration) { +func (q *DelayLkQueue[TKey, TValue]) DelayEnqueue(value TValue, duration time.Duration) { time.AfterFunc(duration, func() { q.Enqueue(value) }) } // CancellableDelayEnqueue 可取消的延迟入队 -func (q *DelayLkQueue[TValue, TKey]) CancellableDelayEnqueue(key TKey, value TValue, duration time.Duration) { +func (q *DelayLkQueue[TKey, TValue]) CancellableDelayEnqueue(key TKey, value TValue, duration time.Duration) { if timer, ok := q.timers[key]; ok { timer.Stop() } @@ -38,7 +38,7 @@ func (q *DelayLkQueue[TValue, TKey]) CancellableDelayEnqueue(key TKey, value TVa } // CancelDelayEnqueue 取消延迟入队 -func (q *DelayLkQueue[TValue, TKey]) CancelDelayEnqueue(key TKey) { +func (q *DelayLkQueue[TKey, TValue]) CancelDelayEnqueue(key TKey) { q.m.Lock() defer q.m.Unlock() if timer, ok := q.timers[key]; ok { @@ -48,7 +48,7 @@ func (q *DelayLkQueue[TValue, TKey]) CancelDelayEnqueue(key TKey) { } // ContinuousDequeue 持续监听出队 -func (q *DelayLkQueue[TValue, TKey]) ContinuousDequeue() iter.Seq[TValue] { +func (q *DelayLkQueue[TKey, TValue]) ContinuousDequeue() iter.Seq[TValue] { return func(yield func(TValue) bool) { for { if value, ok := q.Dequeue(); ok { @@ -63,14 +63,14 @@ func (q *DelayLkQueue[TValue, TKey]) ContinuousDequeue() iter.Seq[TValue] { } // ContinuousDequeueExecute 持续监听出队执行函数 -func (q *DelayLkQueue[TValue, TKey]) ContinuousDequeueExecute(fn func(TValue)) { +func (q *DelayLkQueue[TKey, TValue]) ContinuousDequeueExecute(fn func(TValue)) { for value := range q.ContinuousDequeue() { fn(value) } } // ContinuousDequeueNotify 持续监听出队通知 -func (q *DelayLkQueue[TValue, TKey]) ContinuousDequeueNotify(chs ...chan TValue) { +func (q *DelayLkQueue[TKey, TValue]) ContinuousDequeueNotify(chs ...chan TValue) { q.ContinuousDequeueExecute(func(value TValue) { for _, ch := range chs { ch <- value diff --git a/lock_free/queues.go b/lock_free/queues.go index b557ca8..ffbecf5 100644 --- a/lock_free/queues.go +++ b/lock_free/queues.go @@ -7,106 +7,81 @@ import ( ) // Queues 队列集合 -type Queues[TValue any, TKey, TRoute comparable] struct { - queues map[TRoute]*DelayLkQueue[TValue, TKey] - m sync.RWMutex +type Queues[TKey comparable, TValue, TRoute any] struct { + queues sync.Map // 读多写少的场景,适合用 sync.Map 路由队列 } // NewQueues 创建队列集合 -func NewQueues[TValue any, TKey, TRoute comparable]() *Queues[TValue, TKey, TRoute] { - return &Queues[TValue, TKey, TRoute]{make(map[TRoute]*DelayLkQueue[TValue, TKey]), sync.RWMutex{}} +func NewQueues[TKey comparable, TValue, TRoute any]() *Queues[TKey, TValue, TRoute] { + return &Queues[TKey, TValue, TRoute]{} } // Enqueue 入队 -func (q *Queues[TValue, TKey, TRoute]) Enqueue(route TRoute, value TValue) { - q.m.RLock() - if queue, ok := q.queues[route]; ok { - q.m.RUnlock() - queue.Enqueue(value) +func (q *Queues[TKey, TValue, TRoute]) Enqueue(route TRoute, value TValue) { + if queue, ok := q.queues.Load(route); ok { + queue.(*DelayLkQueue[TKey, TValue]).Enqueue(value) } else { - q.m.RUnlock() - queue := NewDelayLkQueue[TValue, TKey]() - q.m.Lock() - q.queues[route] = queue - q.m.Unlock() + queue := NewDelayLkQueue[TKey, TValue]() + q.queues.Store(route, queue) queue.Enqueue(value) } } // Dequeue 出队 -func (q *Queues[TValue, TKey, TRoute]) Dequeue(route TRoute) (value TValue, ok bool) { - q.m.RLock() - defer q.m.RUnlock() - if queue, ok := q.queues[route]; ok { - return queue.Dequeue() +func (q *Queues[TKey, TValue, TRoute]) Dequeue(route TRoute) (value TValue, ok bool) { + if queue, ok := q.queues.Load(route); ok { + return queue.(*DelayLkQueue[TKey, TValue]).Dequeue() } return value, false } // DelayEnqueue 延迟入队 -func (q *Queues[TValue, TKey, TRoute]) DelayEnqueue(route TRoute, value TValue, duration time.Duration) { - q.m.RLock() - if queue, ok := q.queues[route]; ok { - q.m.RUnlock() - queue.DelayEnqueue(value, duration) +func (q *Queues[TKey, TValue, TRoute]) DelayEnqueue(route TRoute, value TValue, duration time.Duration) { + if queue, ok := q.queues.Load(route); ok { + queue.(*DelayLkQueue[TKey, TValue]).DelayEnqueue(value, duration) } else { - q.m.RUnlock() - queue := NewDelayLkQueue[TValue, TKey]() - q.m.Lock() - q.queues[route] = queue - q.m.Unlock() + queue := NewDelayLkQueue[TKey, TValue]() + q.queues.Store(route, queue) queue.DelayEnqueue(value, duration) } } // CancellableDelayEnqueue 可取消的延迟入队 -func (q *Queues[TValue, TKey, TRoute]) CancellableDelayEnqueue(route TRoute, key TKey, value TValue, duration time.Duration) { - q.m.RLock() - defer q.m.RUnlock() - if queue, ok := q.queues[route]; ok { - queue.CancellableDelayEnqueue(key, value, duration) +func (q *Queues[TKey, TValue, TRoute]) CancellableDelayEnqueue(route TRoute, key TKey, value TValue, duration time.Duration) { + if queue, ok := q.queues.Load(route); ok { + queue.(*DelayLkQueue[TKey, TValue]).CancellableDelayEnqueue(key, value, duration) } else { - queue := NewDelayLkQueue[TValue, TKey]() - q.m.Lock() - q.queues[route] = queue - q.m.Unlock() + queue := NewDelayLkQueue[TKey, TValue]() + q.queues.Store(route, queue) queue.CancellableDelayEnqueue(key, value, duration) } } // CancelDelayEnqueue 取消延迟入队 -func (q *Queues[TValue, TKey, TRoute]) CancelDelayEnqueue(route TRoute, key TKey) { - q.m.RLock() - defer q.m.RUnlock() - if queue, ok := q.queues[route]; ok { - queue.CancelDelayEnqueue(key) +func (q *Queues[TKey, TValue, TRoute]) CancelDelayEnqueue(route TRoute, key TKey) { + if queue, ok := q.queues.Load(route); ok { + queue.(*DelayLkQueue[TKey, TValue]).CancelDelayEnqueue(key) } } // ContinuousDequeue 持续监听出队 -func (q *Queues[TValue, TKey, TRoute]) ContinuousDequeue(route TRoute) iter.Seq[TValue] { - q.m.RLock() - defer q.m.RUnlock() - if queue, ok := q.queues[route]; ok { - return queue.ContinuousDequeue() +func (q *Queues[TKey, TValue, TRoute]) ContinuousDequeue(route TRoute) iter.Seq[TValue] { + if queue, ok := q.queues.Load(route); ok { + return queue.(*DelayLkQueue[TKey, TValue]).ContinuousDequeue() } return nil } // ContinuousDequeueExecute 持续监听出队执行函数 -func (q *Queues[TValue, TKey, TRoute]) ContinuousDequeueExecute(route TRoute, fn func(TValue)) { - q.m.RLock() - defer q.m.RUnlock() - if queue, ok := q.queues[route]; ok { - queue.ContinuousDequeueExecute(fn) +func (q *Queues[TKey, TValue, TRoute]) ContinuousDequeueExecute(route TRoute, fn func(TValue)) { + if queue, ok := q.queues.Load(route); ok { + queue.(*DelayLkQueue[TKey, TValue]).ContinuousDequeueExecute(fn) } } // ContinuousDequeueNotify 持续监听出队通知 -func (q *Queues[TValue, TKey, TRoute]) ContinuousDequeueNotify(route TRoute, chs ...chan TValue) { - q.m.RLock() - defer q.m.RUnlock() - if queue, ok := q.queues[route]; ok { - queue.ContinuousDequeueNotify(chs...) +func (q *Queues[TKey, TValue, TRoute]) ContinuousDequeueNotify(route TRoute, chs ...chan TValue) { + if queue, ok := q.queues.Load(route); ok { + queue.(*DelayLkQueue[TKey, TValue]).ContinuousDequeueNotify(chs...) } } diff --git a/lock_free/queues_test.go b/lock_free/queues_test.go index 32dee54..6e9e4e0 100644 --- a/lock_free/queues_test.go +++ b/lock_free/queues_test.go @@ -14,7 +14,7 @@ func TestQueues(t *testing.T) { {3, time.Second * 3}, } route := "test" - q := NewQueues[int, struct{}, string]() + q := NewQueues[struct{}, int, string]() for _, c := range cases { q.DelayEnqueue(route, c.value, c.duration)