增加无锁队列及对应延迟队列
This commit is contained in:
parent
6a05f8614c
commit
ed4289d224
28
lock_free/delay_queue.go
Normal file
28
lock_free/delay_queue.go
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
package lock_free
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
type DelayLkQueue[T any] struct {
|
||||||
|
LkQueue[T]
|
||||||
|
}
|
||||||
|
|
||||||
|
// DelayEnqueue 延迟入队
|
||||||
|
func (q *DelayLkQueue[T]) DelayEnqueue(value T, duration time.Duration) {
|
||||||
|
time.AfterFunc(duration, func() {
|
||||||
|
q.Enqueue(value)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// ContinuousDequeue 持续监听出队通知
|
||||||
|
func (q *DelayLkQueue[T]) ContinuousDequeue(notify ...chan T) {
|
||||||
|
for {
|
||||||
|
value, ok := q.Dequeue()
|
||||||
|
if !ok {
|
||||||
|
time.Sleep(time.Millisecond) // 自旋
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, n := range notify {
|
||||||
|
n <- value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
75
lock_free/queue.go
Normal file
75
lock_free/queue.go
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
package lock_free
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync/atomic"
|
||||||
|
"unsafe"
|
||||||
|
)
|
||||||
|
|
||||||
|
// LkQueue 无锁队列
|
||||||
|
type LkQueue[T any] struct {
|
||||||
|
head unsafe.Pointer
|
||||||
|
tail unsafe.Pointer
|
||||||
|
}
|
||||||
|
|
||||||
|
// node 节点
|
||||||
|
type node[T any] struct {
|
||||||
|
value T
|
||||||
|
next unsafe.Pointer
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLkQueue 创建无锁队列
|
||||||
|
func NewLkQueue[T any]() *LkQueue[T] {
|
||||||
|
n := unsafe.Pointer(&node[T]{})
|
||||||
|
return &LkQueue[T]{head: n, tail: n}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enqueue 入队
|
||||||
|
func (q *LkQueue[T]) Enqueue(value T) {
|
||||||
|
n := &node[T]{value: value}
|
||||||
|
for {
|
||||||
|
tail := load[T](&q.tail)
|
||||||
|
next := load[T](&tail.next)
|
||||||
|
if tail == load[T](&q.tail) { // tail 和 next 是否一致
|
||||||
|
if next == nil {
|
||||||
|
if cas(&tail.next, next, n) {
|
||||||
|
cas(&q.tail, tail, n) // 入队完成。设置 tail
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
cas(&q.tail, tail, next)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dequeue 出队
|
||||||
|
func (q *LkQueue[T]) Dequeue() (value T, ok bool) {
|
||||||
|
for {
|
||||||
|
head := load[T](&q.head)
|
||||||
|
tail := load[T](&q.tail)
|
||||||
|
next := load[T](&head.next)
|
||||||
|
if head == load[T](&q.head) { // 检查 head、tail 和 next 是否一致
|
||||||
|
if head == tail { // 队列为空,或者 tail 还未到队尾
|
||||||
|
if next == nil { // 为空
|
||||||
|
return value, false
|
||||||
|
}
|
||||||
|
cas(&q.tail, tail, next) // 将 tail 往队尾移动
|
||||||
|
} else {
|
||||||
|
value = next.value
|
||||||
|
if cas(&q.head, head, next) {
|
||||||
|
return value, true // 出队完成
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// load 读取节点的值
|
||||||
|
func load[T any](p *unsafe.Pointer) *node[T] {
|
||||||
|
return (*node[T])(atomic.LoadPointer(p))
|
||||||
|
}
|
||||||
|
|
||||||
|
// cas 原子地修改节点的值
|
||||||
|
func cas[T any](p *unsafe.Pointer, old, new *node[T]) bool {
|
||||||
|
return atomic.CompareAndSwapPointer(p, unsafe.Pointer(old), unsafe.Pointer(new))
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user