You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

32 KiB

02 | Mutex庖丁解牛看实现

你好,我是鸟窝。

上一讲我们一起体验了Mutex的使用竟是那么简单只有简简单单两个方法Lock和Unlock进入临界区之前调用Lock方法退出临界区的时候调用Unlock方法。这个时候你一定会有一丝好奇“它的实现是不是也很简单呢

其实不是的。如果你阅读Go标准库里Mutex的源代码并且追溯Mutex的演进历史你会发现从一个简单易于理解的互斥锁的实现到一个非常复杂的数据结构这是一个逐步完善的过程。Go开发者们做了种种努力精心设计。我自己每次看都会被这种匠心和精益求精的精神打动。

所以今天我就想带着你一起去探索Mutex的实现及演进之路希望你能和我一样体验到这种技术追求的美妙。我们从Mutex的一个简单实现开始看看它是怎样逐步提升性能和公平性的。在这个过程中我们可以学习如何逐步设计一个完善的同步原语并能对复杂度、性能、结构设计的权衡考量有新的认识。经过这样一个学习我们不仅能通透掌握Mutex更好地使用这个工具同时对我们自己设计并发数据接口也非常有帮助。

那具体怎么来讲呢我把Mutex的架构演进分成了四个阶段下面给你画了一张图来说明。

初版”的Mutex使用一个flag来表示锁是否被持有实现比较简单后来照顾到新来的goroutine所以会让新的goroutine也尽可能地先获取到锁这是第二个阶段我把它叫作“给新人机会”;那么,接下来就是第三阶段“多给些机会照顾新来的和被唤醒的goroutine但是这样会带来饥饿问题所以目前又加入了饥饿的解决方案也就是第四阶段“解决饥饿”。

有了这四个阶段我们学习的路径就清晰了那接下来我会从代码层面带你领略Go开发者这些大牛们是如何逐步解决这些问题的。

初版的互斥锁

我们先来看怎么实现一个最简单的互斥锁。在开始之前,你可以先想一想,如果是你,你会怎么设计呢?

你可能会想到可以通过一个flag变量标记当前的锁是否被某个goroutine持有。如果这个flag的值是1就代表锁已经被持有那么其它竞争的goroutine只能等待如果这个flag的值是0就可以通过CAScompare-and-swap或者compare-and-set将这个flag设置为1标识锁被当前的这个goroutine持有了。

实际上Russ Cox在2008年提交的第一版Mutex就是这样实现的。

   // CAS操作当时还没有抽象出atomic包
    func cas(val *int32, old, new int32) bool
    func semacquire(*int32)
    func semrelease(*int32)
    // 互斥锁的结构,包含两个字段
    type Mutex struct {
        key  int32 // 锁是否被持有的标识
        sema int32 // 信号量专用,用以阻塞/唤醒goroutine
    }
    
    // 保证成功在val上增加delta的值
    func xadd(val *int32, delta int32) (new int32) {
        for {
            v := *val
            if cas(val, v, v+delta) {
                return v + delta
            }
        }
        panic("unreached")
    }
    
    // 请求锁
    func (m *Mutex) Lock() {
        if xadd(&m.key, 1) == 1 { //标识加1如果等于1成功获取到锁
            return
        }
        semacquire(&m.sema) // 否则阻塞等待
    }
    
    func (m *Mutex) Unlock() {
        if xadd(&m.key, -1) == 0 { // 将标识减去1如果等于0则没有其它等待者
            return
        }
        semrelease(&m.sema) // 唤醒其它阻塞的goroutine
    }    

这里呢我先简单补充介绍下刚刚提到的CAS。

CAS指令将给定的值一个内存地址中的值进行比较,如果它们是同一个值,就使用新值替换内存地址中的值,这个操作是原子性的。那啥是原子性呢?如果你还不太理解这个概念,那么在这里只需要明确一点就行了,那就是原子性保证这个指令总是基于最新的值进行计算如果同时有其它线程已经修改了这个值那么CAS会返回失败

CAS是实现互斥锁和同步原语的基础我们很有必要掌握它。

好了,我们继续来分析下刚才的这段代码。

虽然当时的Go语法和现在的稍微有些不同而且标准库的布局、实现和现在的也有很大的差异但是这些差异不会影响我们对代码的理解因为最核心的结构体struct和函数、方法的定义几乎是一样的。

Mutex 结构体包含两个字段:

  • **字段key**是一个flag用来标识这个排外锁是否被某个goroutine所持有如果key大于等于1说明这个排外锁已经被持有
  • **字段sema**是个信号量变量用来控制等待goroutine的阻塞休眠和唤醒。

调用Lock请求锁的时候通过xadd方法进行CAS操作第24行xadd方法通过循环执行CAS操作直到成功保证对key加1的操作成功完成。如果比较幸运锁没有被别的goroutine持有那么Lock方法成功地将key设置为1这个goroutine就持有了这个锁如果锁已经被别的goroutine持有了那么当前的goroutine会把key加1而且还会调用semacquire方法第27行使用信号量将自己休眠等锁释放的时候信号量会将它唤醒。

持有锁的goroutine调用Unlock释放锁时它会将key减1第31行。如果当前没有其它等待这个锁的goroutine这个方法就返回了。但是如果还有等待此锁的其它goroutine那么它会调用semrelease方法第34行利用信号量唤醒等待锁的其它goroutine中的一个。

所以到这里我们就知道了初版的Mutex利用CAS原子操作对key这个标志量进行设置。key不仅仅标识了锁是否被goroutine所持有还记录了当前持有和等待获取锁的goroutine的数量。

Mutex的整体设计非常简洁学习起来一点也没有障碍。但是注意我要划重点了。

Unlock方法可以被任意的goroutine调用释放锁即使是没持有这个互斥锁的goroutine也可以进行这个操作。这是因为Mutex本身并没有包含持有这把锁的goroutine的信息所以Unlock也不会对此进行检查。Mutex的这个设计一直保持至今。

这就带来了一个有趣而危险的功能。为什么这么说呢?

你看其它goroutine可以强制释放锁这是一个非常危险的操作因为在临界区的goroutine可能不知道锁已经被释放了还会继续执行临界区的业务操作这可能会带来意想不到的结果因为这个goroutine还以为自己持有锁呢有可能导致data race问题。

所以我们在使用Mutex的时候必须要保证goroutine尽可能不去释放自己未持有的锁一定要遵循“谁申请,谁释放”的原则。在真实的实践中,我们使用互斥锁的时候,很少在一个方法中单独申请锁,而在另外一个方法中单独释放锁,一般都会在同一个方法中获取锁和释放锁。

如果你接触过其它语言比如Java语言的互斥锁的实现就会发现这一点和其它语言的互斥锁不同所以如果是从其它语言转到Go语言开发的同学一定要注意。

以前我们经常会基于性能的考虑及时释放掉锁所以在一些if-else分支中加上释放锁的代码代码看起来很臃肿。而且在重构的时候也很容易因为误删或者是漏掉而出现死锁的现象。

type Foo struct {
    mu    sync.Mutex
    count int
}

func (f *Foo) Bar() {
    f.mu.Lock()

    if f.count < 1000 {
        f.count += 3
        f.mu.Unlock() // 此处释放锁
        return
    }

    f.count++
    f.mu.Unlock() // 此处释放锁
    return
}

从1.14版本起Go对defer做了优化采用更有效的内联方式取代之前的生成defer对象到defer chain中defer对耗时的影响微乎其微了所以基本上修改成下面简洁的写法也没问题

func (f *Foo) Bar() {
    f.mu.Lock()
    defer f.mu.Unlock()


    if f.count < 1000 {
        f.count += 3
        return
    }


    f.count++
    return
}

这样做的好处就是Lock/Unlock总是成对紧凑出现不会遗漏或者多调用代码更少。

但是如果临界区只是方法中的一部分为了尽快释放锁还是应该第一时间调用Unlock而不是一直等到方法返回时才释放。

初版的Mutex实现之后Go开发组又对Mutex做了一些微调比如把字段类型变成了uint32类型调用Unlock方法会做检查使用atomic包的同步原语执行原子操作等等这些小的改动都不是核心功能你简单知道就行了我就不详细介绍了。

但是初版的Mutex实现有一个问题请求锁的goroutine会排队等待获取互斥锁。虽然这貌似很公平但是从性能上来看却不是最优的。因为如果我们能够把锁交给正在占用CPU时间片的goroutine的话那就不需要做上下文的切换在高并发的情况下可能会有更好的性能。

接下来我们就继续探索Go开发者是怎么解决这个问题的。

给新人机会

Go开发者在2011年6月30日的commit中对Mutex做了一次大的调整调整后的Mutex实现如下

   type Mutex struct {
        state int32
        sema  uint32
    }


    const (
        mutexLocked = 1 << iota // mutex is locked
        mutexWoken
        mutexWaiterShift = iota
    )

虽然Mutex结构体还是包含两个字段但是第一个字段已经改成了state它的含义也不一样了。

state是一个复合型的字段一个字段包含多个意义这样可以通过尽可能少的内存来实现互斥锁。这个字段的第一位最小的一位来表示这个锁是否被持有第二位代表是否有唤醒的goroutine剩余的位数代表的是等待此锁的goroutine数。所以state这一个字段被分成了三部分代表三个数据。

请求锁的方法Lock也变得复杂了。复杂之处不仅仅在于对字段state的操作难以理解而且代码逻辑也变得相当复杂。

   func (m *Mutex) Lock() {
        // Fast path: 幸运case能够直接获取到锁
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
            return
        }

        awoke := false
        for {
            old := m.state
            new := old | mutexLocked // 新状态加锁
            if old&mutexLocked != 0 {
                new = old + 1<<mutexWaiterShift //等待者数量加一
            }
            if awoke {
                // goroutine是被唤醒的
                // 新状态清除唤醒标志
                new &^= mutexWoken
            }
            if atomic.CompareAndSwapInt32(&m.state, old, new) {//设置新状态
                if old&mutexLocked == 0 { // 锁原状态未加锁
                    break
                }
                runtime.Semacquire(&m.sema) // 请求信号量
                awoke = true
            }
        }
    }

首先是通过CAS检测state字段中的标志第3行如果没有goroutine持有锁也没有等待持有锁的gorutine那么当前的goroutine就很幸运可以直接获得锁这也是注释中的Fast path的意思。

如果不够幸运state不是零值那么就通过一个循环进行检查。接下来的第7行到第26行这段代码虽然只有几行但是理解起来却要费一番功夫因为涉及到对state不同标志位的操作。这里的位操作以及操作后的结果和数值比较并没有明确的解释有时候你需要根据后续的处理进行推断。所以说如果你充分理解了这段代码那么对最新版的Mutex也会比较容易掌握了因为你已经清楚了这些位操作的含义。

我们先前知道如果想要获取锁的goroutine没有机会获取到锁就会进行休眠但是在锁释放唤醒之后它并不能像先前一样直接获取到锁还是要和正在请求锁的goroutine进行竞争。这会给后来请求锁的goroutine一个机会也让CPU中正在执行的goroutine有更多的机会获取到锁在一定程度上提高了程序的性能。

for循环是不断尝试获取锁如果获取不到就通过runtime.Semacquire(&m.sema)休眠休眠醒来之后awoke置为true尝试争抢锁。

代码中的第10行将当前的flag设置为加锁状态如果能成功地通过CAS把这个新值赋予state第19行和第20行就代表抢夺锁的操作成功了。

不过需要注意的是如果成功地设置了state的值但是之前的state是有锁的状态那么state只是清除mutexWoken标志或者增加一个waiter而已。

请求锁的goroutine有两类一类是新来请求锁的goroutine另一类是被唤醒的等待请求锁的goroutine。锁的状态也有两种加锁和未加锁。我用一张表格来说明一下goroutine不同来源不同状态下的处理逻辑。

刚刚说的都是获取锁接下来我们再来看看释放锁。释放锁的Unlock方法也有些复杂我们来看一下。

   func (m *Mutex) Unlock() {
        // Fast path: drop lock bit.
        new := atomic.AddInt32(&m.state, -mutexLocked) //去掉锁标志
        if (new+mutexLocked)&mutexLocked == 0 { //本来就没有加锁
            panic("sync: unlock of unlocked mutex")
        }
    
        old := new
        for {
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { // 没有等待者或者有唤醒的waiter或者锁原来已加锁
                return
            }
            new = (old - 1<<mutexWaiterShift) | mutexWoken // 新状态准备唤醒goroutine并设置唤醒标志
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime.Semrelease(&m.sema)
                return
            }
            old = m.state
        }
    }

下面我来给你解释一下这个方法。

第3行是尝试将持有锁的标识设置为未加锁的状态这是通过减1而不是将标志位置零的方式实现。第4到6行还会检测原来锁的状态是否已经未加锁的状态如果是Unlock一个未加锁的Mutex会直接panic。

不过即使将加锁置为未加锁的状态这个方法也不能直接返回还需要一些额外的操作因为还可能有一些等待这个锁的goroutine有时候我也把它们称之为waiter需要通过信号量的方式唤醒它们中的一个。所以接下来的逻辑有两种情况。

第一种情况如果没有其它的waiter说明对这个锁的竞争的goroutine只有一个那就可以直接返回了如果这个时候有唤醒的goroutine或者是又被别人加了锁那么无需我们操劳其它goroutine自己干得都很好当前的这个goroutine就可以放心返回了。

第二种情况如果有等待者并且没有唤醒的waiter那就需要唤醒一个等待的waiter。在唤醒之前需要将waiter数量减1并且将mutexWoken标志设置上这样Unlock就可以返回了。

通过这样复杂的检查、判断和设置,我们就可以安全地将一把互斥锁释放了。

相对于初版的设计这次的改动主要就是新来的goroutine也有机会先获取到锁甚至一个goroutine可能连续获取到锁打破了先来先得的逻辑。但是代码复杂度也显而易见。

虽然这一版的Mutex已经给新来请求锁的goroutine一些机会让它参与竞争没有空闲的锁或者竞争失败才加入到等待队列中。但是其实还可以进一步优化。我们接着往下看。

多给些机会

在2015年2月的改动中如果新来的goroutine或者是被唤醒的goroutine首次获取不到锁它们就会通过自旋spin通过循环不断尝试spin的逻辑是在runtime实现的)的方式,尝试检查锁是否被释放。在尝试一定的自旋次数后,再执行原来的逻辑。

   func (m *Mutex) Lock() {
        // Fast path: 幸运之路,正好获取到锁
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
            return
        }

        awoke := false
        iter := 0
        for { // 不管是新来的请求锁的goroutine, 还是被唤醒的goroutine都不断尝试请求锁
            old := m.state // 先保存当前锁的状态
            new := old | mutexLocked // 新状态设置加锁标志
            if old&mutexLocked != 0 { // 锁还没被释放
                if runtime_canSpin(iter) { // 还可以自旋
                    if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                        atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                        awoke = true
                    }
                    runtime_doSpin()
                    iter++
                    continue // 自旋,再次尝试请求锁
                }
                new = old + 1<<mutexWaiterShift
            }
            if awoke { // 唤醒状态
                if new&mutexWoken == 0 {
                    panic("sync: inconsistent mutex state")
                }
                new &^= mutexWoken // 新状态清除唤醒标记
            }
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                if old&mutexLocked == 0 { // 旧状态锁已释放,新状态成功持有了锁,直接返回
                    break
                }
                runtime_Semacquire(&m.sema) // 阻塞等待
                awoke = true // 被唤醒
                iter = 0
            }
        }
    }

这次的优化增加了第13行到21行、第25行到第27行以及第36行。我来解释一下主要的逻辑也就是第13行到21行。

如果可以spin的话第9行的for循环会重新检查锁是否释放。对于临界区代码执行非常短的场景来说这是一个非常好的优化。因为临界区的代码耗时很短锁很快就能释放而抢夺锁的goroutine不用通过休眠唤醒方式等待调度直接spin几次可能就获得了锁。

解决饥饿

经过几次优化Mutex的代码越来越复杂应对高并发争抢锁的场景也更加公平。但是你有没有想过因为新来的goroutine也参与竞争有可能每次都会被新来的goroutine抢到获取锁的机会在极端情况下等待中的goroutine可能会一直获取不到锁这就是饥饿问题

说到这儿,我突然想到了最近看到的一种叫做鹳的鸟。如果鹳妈妈寻找食物很艰难,找到的食物只够一个幼鸟吃的,鹳妈妈就会把食物给最强壮的一只,这样一来,饥饿弱小的幼鸟总是得不到食物吃,最后就会被啄出巢去。

先前版本的Mutex遇到的也是同样的困境“悲惨”的goroutine总是得不到锁。

Mutex不能容忍这种事情发生。所以2016年Go 1.9中Mutex增加了饥饿模式让锁变得更公平不公平的等待时间限制在1毫秒并且修复了一个大Bug总是把唤醒的goroutine放在等待队列的尾部会导致更加不公平的等待时间。

之后2018年Go开发者将fast path和slow path拆成独立的方法以便内联提高性能。2019年也有一个Mutex的优化虽然没有对Mutex做修改但是对于Mutex唤醒后持有锁的那个waiter调度器可以有更高的优先级去执行这已经是很细致的性能优化了。

为了避免代码过多这里只列出当前的Mutex实现。想要理解当前的Mutex我们需要好好泡一杯茶仔细地品一品了。

当然现在的Mutex代码已经复杂得接近不可读的状态了而且代码也非常长删减后占了几乎三页纸。但是作为第一个要详细介绍的同步原语我还是希望能更清楚地剖析Mutex的实现向你展示它的演化和为了一个貌似很小的feature不得不将代码变得非常复杂的原因。

当然,你也可以暂时略过这一段,以后慢慢品,只需要记住Mutex绝不容忍一个goroutine被落下永远没有机会获取锁。不抛弃不放弃是它的宗旨而且它也尽可能地让等待较长的goroutine更有机会获取到锁

   type Mutex struct {
        state int32
        sema  uint32
    }
    
    const (
        mutexLocked = 1 << iota // mutex is locked
        mutexWoken
        mutexStarving // 从state字段中分出一个饥饿标记
        mutexWaiterShift = iota
    
        starvationThresholdNs = 1e6
    )
    
    func (m *Mutex) Lock() {
        // Fast path: 幸运之路,一下就获取到了锁
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
            return
        }
        // Slow path缓慢之路尝试自旋竞争或饥饿状态下饥饿goroutine竞争
        m.lockSlow()
    }
    
    func (m *Mutex) lockSlow() {
        var waitStartTime int64
        starving := false // 此goroutine的饥饿标记
        awoke := false // 唤醒标记
        iter := 0 // 自旋次数
        old := m.state // 当前的锁的状态
        for {
            // 锁是非饥饿状态,锁还没被释放,尝试自旋
            if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true
                }
                runtime_doSpin()
                iter++
                old = m.state // 再次获取锁的状态,之后会检查是否锁被释放了
                continue
            }
            new := old
            if old&mutexStarving == 0 {
                new |= mutexLocked // 非饥饿状态,加锁
            }
            if old&(mutexLocked|mutexStarving) != 0 {
                new += 1 << mutexWaiterShift // waiter数量加1
            }
            if starving && old&mutexLocked != 0 {
                new |= mutexStarving // 设置饥饿状态
            }
            if awoke {
                if new&mutexWoken == 0 {
                    throw("sync: inconsistent mutex state")
                }
                new &^= mutexWoken // 新状态清除唤醒标记
            }
            // 成功设置新状态
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                // 原来锁的状态已释放,并且不是饥饿状态,正常请求到了锁,返回
                if old&(mutexLocked|mutexStarving) == 0 {
                    break // locked the mutex with CAS
                }
                // 处理饥饿状态

                // 如果以前就在队列里面,加入到队列头
                queueLifo := waitStartTime != 0
                if waitStartTime == 0 {
                    waitStartTime = runtime_nanotime()
                }
                // 阻塞等待
                runtime_SemacquireMutex(&m.sema, queueLifo, 1)
                // 唤醒之后检查锁是否应该处于饥饿状态
                starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
                old = m.state
                // 如果锁已经处于饥饿状态,直接抢到锁,返回
                if old&mutexStarving != 0 {
                    if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                        throw("sync: inconsistent mutex state")
                    }
                    // 有点绕加锁并且将waiter数减1
                    delta := int32(mutexLocked - 1<<mutexWaiterShift)
                    if !starving || old>>mutexWaiterShift == 1 {
                        delta -= mutexStarving // 最后一个waiter或者已经不饥饿了清除饥饿标记
                    }
                    atomic.AddInt32(&m.state, delta)
                    break
                }
                awoke = true
                iter = 0
            } else {
                old = m.state
            }
        }
    }
    
    func (m *Mutex) Unlock() {
        // Fast path: drop lock bit.
        new := atomic.AddInt32(&m.state, -mutexLocked)
        if new != 0 {
            m.unlockSlow(new)
        }
    }
    
    func (m *Mutex) unlockSlow(new int32) {
        if (new+mutexLocked)&mutexLocked == 0 {
            throw("sync: unlock of unlocked mutex")
        }
        if new&mutexStarving == 0 {
            old := new
            for {
                if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                    return
                }
                new = (old - 1<<mutexWaiterShift) | mutexWoken
                if atomic.CompareAndSwapInt32(&m.state, old, new) {
                    runtime_Semrelease(&m.sema, false, 1)
                    return
                }
                old = m.state
            }
        } else {
            runtime_Semrelease(&m.sema, true, 1)
        }
    }

跟之前的实现相比当前的Mutex最重要的变化就是增加饥饿模式。第12行将饥饿模式的最大等待时间阈值设置成了1毫秒这就意味着一旦等待者等待的时间超过了这个阈值Mutex的处理就有可能进入饥饿模式优先让等待者先获取到锁新来的同学主动谦让一下给老同志一些机会。

通过加入饥饿模式可以避免把机会全都留给新来的goroutine保证了请求锁的goroutine获取锁的公平性对于我们使用锁的业务代码来说不会有业务一直等待锁不被处理。

那么,接下来的部分就是选学内容了。如果你还有精力,并且对饥饿模式很感兴趣,那就跟着我一起继续来挑战吧。如果你现在理解起来觉得有困难,也没关系,后面可以随时回来复习。

饥饿模式和正常模式

Mutex可能处于两种操作模式下正常模式饥饿模式

接下来我们分析一下Mutex对饥饿模式和正常模式的处理。

请求锁时调用的Lock方法中一开始是fast path这是一个幸运的场景当前的goroutine幸运地获得了锁没有竞争直接返回否则就进入了lockSlow方法。这样的设计方便编译器对Lock方法进行内联你也可以在程序开发中应用这个技巧。

正常模式下waiter都是进入先入先出队列被唤醒的waiter并不会直接持有锁而是要和新来的goroutine进行竞争。新来的goroutine有先天的优势它们正在CPU中运行可能它们的数量还不少所以在高并发情况下被唤醒的waiter可能比较悲剧地获取不到锁这时它会被插入到队列的前面。如果waiter获取不到锁的时间超过阈值1毫秒那么这个Mutex就进入到了饥饿模式。

在饥饿模式下Mutex的拥有者将直接把锁交给队列最前面的waiter。新来的goroutine不会尝试获取锁即使看起来锁没有被持有它也不会去抢也不会spin它会乖乖地加入到等待队列的尾部。

如果拥有Mutex的waiter发现下面两种情况的其中之一它就会把这个Mutex转换成正常模式:

  • 此waiter已经是队列中的最后一个waiter了没有其它的等待锁的goroutine了
  • 此waiter的等待时间小于1毫秒。

正常模式拥有更好的性能因为即使有等待抢锁的waitergoroutine也可以连续多次获取到锁。

饥饿模式是对公平性和性能的一种平衡它避免了某些goroutine长时间的等待锁。在饥饿模式下优先对待的是那些一直在等待的waiter。

接下来,我们逐步分析下Mutex代码的关键行彻底搞清楚饥饿模式的细节

我们从请求锁lockSlow的逻辑看起。

第9行对state字段又分出了一位用来标记锁是否处于饥饿状态。现在一个state的字段被划分成了阻塞等待的waiter数量、饥饿标记、唤醒标记和持有锁的标记四个部分。

第25行记录此goroutine请求锁的初始时间第26行标记是否处于饥饿状态第27行标记是否是唤醒的第28行记录spin的次数。

第31行到第40行和以前的逻辑类似只不过加了一个不能是饥饿状态的逻辑。它会对正常状态抢夺锁的goroutine尝试spin和以前的目的一样就是在临界区耗时很短的情况下提高性能。

第42行到第44行非饥饿状态下抢锁。怎么抢就是要把state的锁的那一位置为加锁状态后续CAS如果成功就可能获取到了锁。

第46行到第48行如果锁已经被持有或者锁处于饥饿状态我们最好的归宿就是等待所以waiter的数量加1。

第49行到第51行如果此goroutine已经处在饥饿状态并且锁还被持有那么我们需要把此Mutex设置为饥饿状态。

第52行到第57行是清除mutexWoken标记因为不管是获得了锁还是进入休眠我们都需要清除mutexWoken标记。

第59行就是尝试使用CAS设置state。如果成功第61行到第63行是检查原来的锁的状态是未加锁状态并且也不是饥饿状态的话就成功获取了锁返回。

第67行判断是否第一次加入到waiter队列。到这里你应该就能明白第25行为什么不对waitStartTime进行初始化了我们需要利用它在这里进行条件判断。

第72行将此waiter加入到队列如果是首次加入到队尾先进先出。如果不是首次那么加入到队首这样等待最久的goroutine优先能够获取到锁。此goroutine会进行休眠。

第74行判断此goroutine是否处于饥饿状态。注意执行这一句的时候它已经被唤醒了。

第77行到第88行是对锁处于饥饿状态下的一些处理。

第82行设置一个标志这个标志稍后会用来加锁而且还会将waiter数减1。

第84行设置标志在没有其它的waiter或者此goroutine等待还没超过1毫秒则会将Mutex转为正常状态。

第86行则是将这个标识应用到state字段上。

释放锁Unlock时调用的Unlock的fast path不用多少所以我们主要看unlockSlow方法就行。

如果Mutex处于饥饿状态第123行直接唤醒等待队列中的waiter。

如果Mutex处于正常状态如果没有waiter或者已经有在处理的情况了那么释放就好不做额外的处理第112行到第114行

否则waiter数减1mutexWoken标志设置上通过CAS更新state的值第115行到第119行

总结

“罗马不是一天建成的”Mutex的设计也是从简单设计到复杂处理逐渐演变的。初版的Mutex设计非常简洁充分展示了Go创始者的简单、简洁的设计哲学。但是随着大家的使用逐渐暴露出一些缺陷为了弥补这些缺陷Mutex不得不越来越复杂。

有一点值得我们学习的同时也体现了Go创始者的哲学就是他们强调Go语言和标准库的稳定性新版本要向下兼容用新的版本总能编译老的代码。Go语言从出生到现在已经10多年了这个Mutex对外的接口却没有变化依然向下兼容即使现在Go出了两个版本每个版本也会向下兼容保持Go语言的稳定性你也能领悟他们软件开发和设计的思想。

还有一点你也可以观察到为了一个程序20%的特性你可能需要添加80%的代码,这也是程序越来越复杂的原因。所以,最开始的时候,如果能够有一个清晰而且易于扩展的设计,未来增加新特性时,也会更加方便。

思考题

最后,给你留两个小问题:

  1. 目前Mutex的state字段有几个意义这几个意义分别是由哪些字段表示的
  2. 等待一个Mutex的goroutine数最大是多少是否能满足现实的需求

欢迎在留言区写下你的思考和答案,我们一起交流讨论。如果你觉得有所收获,也欢迎你把今天的内容分享给你的朋友或同事。