From 84bc7b207b6bb725f44893d4c0dd5218fe41dbec Mon Sep 17 00:00:00 2001 From: fantasticbin Date: Sun, 8 Jun 2025 15:08:42 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E4=BF=A1=E5=8F=B7=E9=87=8F?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- semaphore/semaphore.go | 142 ++++++++++++++++++++++++++++++++++++ semaphore/semaphore_chan.go | 25 +++++++ 2 files changed, 167 insertions(+) create mode 100644 semaphore/semaphore.go create mode 100644 semaphore/semaphore_chan.go diff --git a/semaphore/semaphore.go b/semaphore/semaphore.go new file mode 100644 index 0000000..88cf5f6 --- /dev/null +++ b/semaphore/semaphore.go @@ -0,0 +1,142 @@ +package semaphore + +import ( + "container/list" + "context" + "sync" +) + +type waiter struct { + n int64 + ready chan<- struct{} // 唤醒信号 +} + +type Semaphore struct { + size int64 // 资源数量 + cur int64 // 当前已使用的资源数量 + mu sync.Mutex + waiters list.List // 等待队列 +} + +func NewSemaphore(n int64) *Semaphore { + return &Semaphore{ + size: n, + } +} + +func (s *Semaphore) Acquire(ctx context.Context, n int64) error { + done := ctx.Done() + + s.mu.Lock() + // 保证 ctx.Done() happened before Semaphore.Acquire() + select { + case <-done: + s.mu.Unlock() + return ctx.Err() + default: + } + + // 快速路径:如果当前可用资源足够,直接分配 + if s.size-s.cur >= n && s.waiters.Len() == 0 { + s.cur += n + s.mu.Unlock() + return nil + } + + // 慢路径:需要等待释放资源 + return s.acquireSlow(ctx, n) +} + +func (s *Semaphore) acquireSlow(ctx context.Context, n int64) error { + done := ctx.Done() + + // 如果请求的资源数量超过了所能提供的资源数量,则只能依靠 ctx.Done() 来退出 + if n > s.size { + s.mu.Unlock() + <-done + return ctx.Err() + } + + // 资源不足,将调用者加入等待队列 + // 同时创建一个信号通道,用于唤醒等待的调用者 + ready := make(chan struct{}) + w := &waiter{n: n, ready: ready} + elem := s.waiters.PushBack(w) + s.mu.Unlock() + + select { + case <-done: + s.mu.Lock() + select { + case <-ready: + // 如果已经被唤醒,假装已经成功获取资源 + s.cur -= n + s.notifyWaiters() + default: + // 如果还没有被唤醒,从等待队列中移除调用者自己 + isFront := s.waiters.Front() == elem + s.waiters.Remove(elem) + // 如果当前调用者是队列的第一个且有多余资源,唤醒下一个等待者 + if isFront && s.size > s.cur { + s.notifyWaiters() + } + } + s.mu.Unlock() + return ctx.Err() + + case <-ready: + // 成功获取资源,唤醒信号已发送 + select { + case <-done: + s.Release(n) + return ctx.Err() + default: + } + return nil + } +} + +func (s *Semaphore) notifyWaiters() { + for { + next := s.waiters.Front() + if next == nil { + break // 没有等待者 + } + + w := next.Value.(waiter) + if s.size-s.cur < w.n { + // 没有足够的资源满足下一个等待者 + // 没有必要继续唤醒后续等待者,防止有等待者处于饥饿状态 + break + } + + s.cur += w.n + s.waiters.Remove(next) + close(w.ready) // 唤醒等待者 + } +} + +func (s *Semaphore) Release(n int64) { + s.mu.Lock() + s.cur -= n // 释放 n 个资源 + + if s.cur < 0 { + s.mu.Unlock() + panic("semaphore: released more than held") + } + + s.notifyWaiters() // 唤醒等待者 + s.mu.Unlock() +} + +func (s *Semaphore) TryAcquire(n int64) bool { + s.mu.Lock() + // 检查当前可用资源是否足够,并且还没有等待者 + success := s.size-s.cur >= n && s.waiters.Len() == 0 + if success { + s.cur += n // 分配资源 + } + + s.mu.Unlock() + return success +} diff --git a/semaphore/semaphore_chan.go b/semaphore/semaphore_chan.go new file mode 100644 index 0000000..0082301 --- /dev/null +++ b/semaphore/semaphore_chan.go @@ -0,0 +1,25 @@ +package semaphore + +import "sync" + +type SemaChan struct { + sync.Locker + sem chan struct{} // 信号量通道 +} + +func NewSemaChan(n int) *SemaChan { + if n <= 0 { + n = 1 // 确保信号量至少为1,直接变成一个互斥锁 + } + return &SemaChan{ + sem: make(chan struct{}, n), // 初始化信号量通道,容量为n + } +} + +func (s *SemaChan) Lock() { + <-s.sem // 使用接收的方式阻塞,用来与 sync.Mutex 的内存模型保持一致 +} + +func (s *SemaChan) Unlock() { + s.sem <- struct{}{} +}