You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

17 KiB

07 | Cond条件变量的实现机制及避坑指南

你好,我是鸟窝。

在写Go程序之前我曾经写了10多年的Java程序也面试过不少Java程序员。在Java面试中经常被问到的一个知识点就是等待/通知wait/notify机制。面试官经常会这样考察候选人请实现一个限定容量的队列queue当队列满或者空的时候利用等待/通知机制实现阻塞或者唤醒。

在Go中也可以实现一个类似的限定容量的队列而且实现起来也比较简单只要用条件变量Cond并发原语就可以。Cond并发原语相对来说不是那么常用但是在特定的场景使用会事半功倍比如你需要在唤醒一个或者所有的等待者做一些检查操作的时候。

那么今天这一讲我们就学习下Cond这个并发原语。

Go标准库的Cond

Go标准库提供Cond原语的目的是为等待/通知场景下的并发问题提供支持。Cond通常应用于等待某个条件的一组goroutine等条件变为true的时候其中一个goroutine或者所有的goroutine都会被唤醒执行。

顾名思义Cond是和某个条件相关这个条件需要一组goroutine协作共同完成在条件还没有满足的时候所有等待这个条件的goroutine都会被阻塞住只有这一组goroutine通过协作达到了这个条件等待的goroutine才可能继续进行下去。

那这里等待的条件是什么呢等待的条件可以是某个变量达到了某个阈值或者某个时间点也可以是一组变量分别都达到了某个阈值还可以是某个对象的状态满足了特定的条件。总结来讲等待的条件是一种可以用来计算结果是true还是false的条件。

从开发实践上我们真正使用Cond的场景比较少因为一旦遇到需要使用Cond的场景我们更多地会使用Channel的方式我会在第12和第13讲展开Channel的用法去实现因为那才是更地道的Go语言的写法甚至Go的开发者有个“把Cond从标准库移除”的提议issue 21165。而有的开发者认为Cond是唯一难以掌握的Go并发原语。至于其中原因我先卖个关子到这一讲的后半部分我再和你解释。

今天这一讲我们就带你仔细地学一学Cond这个并发原语吧。

Cond的基本用法

标准库中的Cond并发原语初始化的时候需要关联一个Locker接口的实例一般我们使用Mutex或者RWMutex。

我们看一下Cond的实现

type Cond
  func NeWCond(l Locker) *Cond
  func (c *Cond) Broadcast()
  func (c *Cond) Signal()
  func (c *Cond) Wait()

首先Cond关联的Locker实例可以通过c.L访问它内部维护着一个先入先出的等待队列。

然后我们分别看下它的三个方法Broadcast、Signal和Wait方法。

Signal方法允许调用者Caller唤醒一个等待此Cond的goroutine。如果此时没有等待的goroutine显然无需通知waiter如果Cond等待队列中有一个或者多个等待的goroutine则需要从等待队列中移除第一个goroutine并把它唤醒。在其他编程语言中比如Java语言中Signal方法也被叫做notify方法。

调用Signal方法时不强求你一定要持有c.L的锁。

Broadcast方法允许调用者Caller唤醒所有等待此Cond的goroutine。如果此时没有等待的goroutine显然无需通知waiter如果Cond等待队列中有一个或者多个等待的goroutine则清空所有等待的goroutine并全部唤醒。在其他编程语言中比如Java语言中Broadcast方法也被叫做notifyAll方法。

同样地调用Broadcast方法时也不强求你一定持有c.L的锁。

Wait方法会把调用者Caller放入Cond的等待队列中并阻塞直到被Signal或者Broadcast的方法从等待队列中移除并唤醒。

调用Wait方法时必须要持有c.L的锁。

Go实现的sync.Cond的方法名是Wait、Signal和Broadcast这是计算机科学中条件变量的通用方法名。比如C语言中对应的方法名是pthread_cond_wait、pthread_cond_signal和 pthread_cond_broadcast。

知道了Cond提供的三个方法后我们再通过一个百米赛跑开始时的例子来学习下Cond的使用方法。10个运动员进入赛场之后需要先做拉伸活动活动筋骨向观众和粉丝招手致敬在自己的赛道上做好准备等所有的运动员都准备好之后裁判员才会打响发令枪。

每个运动员做好准备之后将ready加一表明自己做好准备了同时调用Broadcast方法通知裁判员。因为裁判员只有一个所以这里可以直接替换成Signal方法调用。调用Broadcast方法的时候我们并没有请求c.L锁只是在更改等待变量的时候才使用到了锁。

裁判员会等待运动员都准备好第22行。虽然每个运动员准备好之后都唤醒了裁判员但是裁判员被唤醒之后需要检查等待条件是否满足运动员都准备好了)。可以看到,裁判员被唤醒之后一定要检查等待条件,如果条件不满足还是要继续等待。

func main() {
    c := sync.NewCond(&sync.Mutex{})
    var ready int

    for i := 0; i < 10; i++ {
        go func(i int) {
            time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)

            // 加锁更改等待条件
            c.L.Lock()
            ready++
            c.L.Unlock()

            log.Printf("运动员#%d 已准备就绪\n", i)
            // 广播唤醒所有的等待者
            c.Broadcast()
        }(i)
    }

    c.L.Lock()
    for ready != 10 {
        c.Wait()
        log.Println("裁判员被唤醒一次")
    }
    c.L.Unlock()

    //所有的运动员是否就绪
    log.Println("所有运动员都准备就绪。比赛开始321, ......")
}

你看Cond的使用其实没那么简单。它的复杂在于这段代码有时候需要加锁有时候可以不加Wait唤醒后需要检查条件条件变量的更改其实是需要原子操作或者互斥锁保护的。所以有的开发者会认为Cond是唯一难以掌握的Go并发原语。

我们继续看看Cond的实现原理。

Cond的实现原理

其实Cond的实现非常简单或者说复杂的逻辑已经被Locker或者runtime的等待队列实现了。我们直接看看Cond的源码吧。

type Cond struct {
    noCopy noCopy

    // 当观察或者修改等待条件的时候需要加锁
    L Locker

    // 等待队列
    notify  notifyList
    checker copyChecker
}

func NewCond(l Locker) *Cond {
    return &Cond{L: l}
}

func (c *Cond) Wait() {
    c.checker.check()
    // 增加到等待队列中
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    // 阻塞休眠直到被唤醒
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify
}

这部分源码确实很简单,我来带你学习下其中比较关键的逻辑。

runtime_notifyListXXX是运行时实现的方法实现了一个等待/通知的队列。如果你想深入学习这部分可以再去看看runtime/sema.go代码中。

copyChecker是一个辅助结构可以在运行时检查Cond是否被复制使用。

Signal和Broadcast只涉及到notifyList数据结构不涉及到锁。

Wait把调用者加入到等待队列时会释放锁在被唤醒之后还会请求锁。在阻塞休眠期间调用者是不持有锁的这样能让其他goroutine有机会检查或者更新等待变量。

我们继续看看使用Cond常见的两个错误一个是调用Wait的时候没有加锁另一个是没有检查条件是否满足程序就继续执行了。

使用Cond的2个常见错误

我们先看Cond最常见的使用错误也就是调用Wait的时候没有加锁

以前面百米赛跑的程序为例在调用cond.Wait时把前后的Lock/Unlock注释掉如下面的代码中的第20行和第25行

func main() {
    c := sync.NewCond(&sync.Mutex{})
    var ready int

    for i := 0; i < 10; i++ {
        go func(i int) {
            time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)

            // 加锁更改等待条件
            c.L.Lock()
            ready++
            c.L.Unlock()

            log.Printf("运动员#%d 已准备就绪\n", i)
            // 广播唤醒所有的等待者
            c.Broadcast()
        }(i)
    }

    // c.L.Lock()
    for ready != 10 {
        c.Wait()
        log.Println("裁判员被唤醒一次")
    }
    // c.L.Unlock()

    //所有的运动员是否就绪
    log.Println("所有运动员都准备就绪。比赛开始321, ......")
}

再运行程序就会报释放未加锁的panic

出现这个问题的原因在于cond.Wait方法的实现是把当前调用者加入到notify队列之中后会释放锁如果不释放锁其他Wait的调用者就没有机会加入到notify队列中了然后一直等待等调用者被唤醒之后又会去争抢这把锁。如果调用Wait之前不加锁的话就有可能Unlock一个未加锁的Locker。所以切记调用cond.Wait方法之前一定要加锁

使用Cond的另一个常见错误是只调用了一次Wait没有检查等待条件是否满足结果条件没满足程序就继续执行了。出现这个问题的原因在于误以为Cond的使用就像WaitGroup那样调用一下Wait方法等待那么简单。比如下面的代码中把第21行和第24行注释掉

func main() {
    c := sync.NewCond(&sync.Mutex{})
    var ready int

    for i := 0; i < 10; i++ {
        go func(i int) {
            time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)

            // 加锁更改等待条件
            c.L.Lock()
            ready++
            c.L.Unlock()

            log.Printf("运动员#%d 已准备就绪\n", i)
            // 广播唤醒所有的等待者
            c.Broadcast()
        }(i)
    }

    c.L.Lock()
    // for ready != 10 {
    c.Wait()
    log.Println("裁判员被唤醒一次")
    // }
    c.L.Unlock()

    //所有的运动员是否就绪
    log.Println("所有运动员都准备就绪。比赛开始321, ......")
}

运行这个程序,你会发现,可能只有几个运动员准备好之后程序就运行完了,而不是我们期望的所有运动员都准备好才进行下一步。原因在于,每一个运动员准备好之后都会唤醒所有的等待者,也就是这里的裁判员,比如第一个运动员准备好后就唤醒了裁判员,结果这个裁判员傻傻地没做任何检查,以为所有的运动员都准备好了,就继续执行了。

所以,我们一定要记住waiter goroutine被唤醒不等于等待条件被满足只是有goroutine把它唤醒了而已等待条件有可能已经满足了也有可能不满足我们需要进一步检查。你也可以理解为等待者被唤醒只是得到了一次检查的机会而已。

到这里我们小结下。如果你想在使用Cond的时候避免犯错只要时刻记住调用cond.Wait方法之前一定要加锁以及waiter goroutine被唤醒不等于等待条件被满足这两个知识点。

知名项目中Cond的使用

Cond在实际项目中被使用的机会比较少原因总结起来有两个。

第一同样的场景我们会使用其他的并发原语来替代。Go特有的Channel类型有一个应用很广泛的模式就是通知机制这个模式使用起来也特别简单。所以很多情况下我们会使用Channel而不是Cond实现wait/notify机制。

第二对于简单的wait/notify场景比如等待一组goroutine完成之后继续执行余下的代码我们会使用WaitGroup来实现。因为WaitGroup的使用方法更简单而且不容易出错。比如上面百米赛跑的问题就可以很方便地使用WaitGroup来实现。

所以我在这一讲开头提到Cond的使用场景很少。先前的标准库内部有几个地方使用了Cond比如io/pipe.go等后来都被其他的并发原语比如Channel替换了sync.Cond的路越走越窄。但是还是有一批忠实的“粉丝”坚持在使用Cond原因在于Cond有三点特性是Channel无法替代的

  • Cond和一个Locker关联可以利用这个Locker对相关的依赖条件更改提供保护。
  • Cond可以同时支持Signal和Broadcast方法而Channel只能同时支持其中一种。
  • Cond的Broadcast方法可以被重复调用。等待条件再次变成不满足的状态后我们又可以调用Broadcast再次唤醒等待的goroutine。这也是Channel不能支持的Channel被close掉了之后不支持再open。

开源项目中使用sync.Cond的代码少之又少包括标准库原先一些使用Cond的代码也改成使用Channel实现了所以别说找Cond相关的使用Bug了想找到的一个使用的例子都不容易我找了Kubernetes中的一个例子我们一起看看它是如何使用Cond的。

Kubernetes项目中定义了优先级队列 PriorityQueue 这样一个数据结构用来实现Pod的调用。它内部有三个Pod的队列即activeQ、podBackoffQ和unschedulableQ其中activeQ就是用来调度的活跃队列heap

Pop方法调用的时候如果这个队列为空并且这个队列没有Close的话会调用Cond的Wait方法等待。

你可以看到调用Wait方法的时候调用者是持有锁的并且被唤醒的时候检查等待条件队列是否为空

// 从队列中取出一个元素
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
		p.lock.Lock()
		defer p.lock.Unlock()
		for p.activeQ.Len() == 0 { // 如果队列为空
			if p.closed {
				return nil, fmt.Errorf(queueClosed)
			}
			p.cond.Wait() // 等待,直到被唤醒
		}
		......
		return pInfo, err
	}


当activeQ增加新的元素时会调用条件变量的Boradcast方法通知被Pop阻塞的调用者。

// 增加元素到队列中
func (p *PriorityQueue) Add(pod *v1.Pod) error {
		p.lock.Lock()
		defer p.lock.Unlock()
		pInfo := p.newQueuedPodInfo(pod)
		if err := p.activeQ.Add(pInfo); err != nil {//增加元素到队列中
			klog.Errorf("Error adding pod %v to the scheduling queue: %v", nsNameForPod(pod), err)
			return err
		}
		......
		p.cond.Broadcast() //通知其它等待的goroutine队列中有元素了

		return nil
	}

这个优先级队列被关闭的时候也会调用Broadcast方法避免被Pop阻塞的调用者永远hang住。

func (p *PriorityQueue) Close() {
		p.lock.Lock()
		defer p.lock.Unlock()
		close(p.stop)
		p.closed = true
		p.cond.Broadcast() //关闭时通知等待的goroutine避免它们永远等待
}

你可以思考一下这里为什么使用Cond这个并发原语能不能换成Channel实现呢

总结

好了,我们来做个总结。

Cond是为等待/通知场景下的并发问题提供支持的。它提供了条件变量的三个基本方法Signal、Broadcast和Wait为并发的goroutine提供等待/通知机制。

在实践中,处理等待/通知的场景时我们常常会使用Channel替换Cond因为Channel类型使用起来更简洁而且不容易出错。但是对于需要重复调用Broadcast的场景比如上面Kubernetes的例子每次往队列中成功增加了元素后就需要调用Broadcast通知所有的等待者使用Cond就再合适不过了。

使用Cond之所以容易出错就是Wait调用需要加锁以及被唤醒后一定要检查条件是否真的已经满足。你需要牢记这两点。

虽然我们讲到的百米赛跑的例子也可以通过WaitGroup来实现但是本质上WaitGroup和Cond是有区别的WaitGroup是主goroutine等待确定数量的子goroutine完成任务而Cond是等待某个条件满足这个条件的修改可以被任意多的goroutine更新而且Cond的Wait不关心也不知道其他goroutine的数量只关心等待条件。而且Cond还有单个通知的机制也就是Signal方法。

思考题

  1. 一个Cond的waiter被唤醒的时候为什么需要再检查等待条件而不是唤醒后进行下一步
  2. 你能否利用Cond实现一个容量有限的queue

欢迎在留言区写下你的思考和答案,我们一起交流讨论。如果你觉得有所收获,也欢迎你把今天的内容分享给你的朋友或同事。