You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

23 KiB

05 RWMutex读写锁的实现原理及避坑指南

你好,我是鸟窝。

在前面的四节课中我们学习了第一个同步原语即Mutex我们使用它来保证读写共享资源的安全性。不管是读还是写我们都通过Mutex来保证只有一个goroutine访问共享资源这在某些情况下有点“浪费”。比如说在写少读多的情况下即使一段时间内没有写操作大量并发的读访问也不得不在Mutex的保护下变成了串行访问这个时候使用Mutex对性能的影响就比较大。

怎么办呢?你是不是已经有思路了,对,就是区分读写操作。

我来具体解释一下。如果某个读操作的goroutine持有了锁在这种情况下其它读操作的goroutine就不必一直傻傻地等待了而是可以并发地访问共享变量这样我们就可以将串行的读变成并行读提高读操作的性能。当写操作的goroutine持有锁的时候它就是一个排外锁其它的写操作和读操作的goroutine需要阻塞等待持有这个锁的goroutine释放锁。

这一类并发读写问题叫作readers-writers问题,意思就是,同时可能有多个读或者多个写,但是只要有一个线程在执行写操作,其它的线程都不能执行读写操作。

Go标准库中的RWMutex读写锁就是用来解决这类readers-writers问题的。所以这节课我们就一起来学习RWMutex。我会给你介绍读写锁的使用场景、实现原理以及容易掉入的坑你一定要记住这些陷阱避免在实际的开发中犯相同的错误。

什么是RWMutex

我先简单解释一下读写锁RWMutex。标准库中的RWMutex是一个 reader/writer 互斥锁。RWMutex在某一时刻只能由任意数量的reader持有或者是只被单个的writer持有。

RWMutex的方法也很少总共有5个。

  • Lock/Unlock写操作时调用的方法。如果锁已经被reader或者writer持有那么Lock方法会一直阻塞直到能获取到锁Unlock则是配对的释放锁的方法。
  • RLock/RUnlock读操作时调用的方法。如果锁已经被writer持有的话RLock方法会一直阻塞直到能获取到锁否则就直接返回而RUnlock是reader释放锁的方法。
  • RLocker这个方法的作用是为读操作返回一个Locker接口的对象。它的Lock方法会调用RWMutex的RLock方法它的Unlock方法会调用RWMutex的RUnlock方法。

RWMutex的零值是未加锁的状态所以当你使用RWMutex的时候无论是声明变量还是嵌入到其它struct中都不必显式地初始化。

我以计数器为例来说明一下如何使用RWMutex保护共享资源。计数器的count++操作是操作而获取count的值是操作这个场景非常适合读写锁因为读操作可以并行执行写操作时只允许一个线程执行这正是readers-writers问题。

在这个例子中使用10个goroutine进行读操作每读取一次sleep 1毫秒同时还有一个gorotine进行写操作每一秒写一次这是一个 1 writer-n reader 的读写场景,而且写操作还不是很频繁(一秒一次):

func main() {
    var counter Counter
    for i := 0; i < 10; i++ { // 10个reader
        go func() {
            for {
                counter.Count() // 计数器读操作
                time.Sleep(time.Millisecond)
            }
        }()
    }

    for { // 一个writer
        counter.Incr() // 计数器写操作
        time.Sleep(time.Second)
    }
}
// 一个线程安全的计数器
type Counter struct {
    mu    sync.RWMutex
    count uint64
}

// 使用写锁保护
func (c *Counter) Incr() {
    c.mu.Lock()
    c.count++
    c.mu.Unlock()
}

// 使用读锁保护
func (c *Counter) Count() uint64 {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.count
}

可以看到Incr方法会修改计数器的值是一个写操作我们使用Lock/Unlock进行保护。Count方法会读取当前计数器的值是一个读操作我们使用RLock/RUnlock方法进行保护。

Incr方法每秒才调用一次所以writer竞争锁的频次是比较低的而10个goroutine每毫秒都要执行一次查询通过读写锁可以极大提升计数器的性能因为在读取的时候可以并发进行。如果使用Mutex性能就不会像读写锁这么好。因为多个reader并发读的时候使用互斥锁导致了reader要排队读的情况没有RWMutex并发读的性能好。

如果你遇到可以明确区分reader和writer goroutine的场景且有大量的并发读、少量的并发写并且有强烈的性能需求你就可以考虑使用读写锁RWMutex替换Mutex。

在实际使用RWMutex的时候如果我们在struct中使用RWMutex保护某个字段一般会把它和这个字段放在一起用来指示两个字段是一组字段。除此之外我们还可以采用匿名字段的方式嵌入struct这样在使用这个struct时我们就可以直接调用Lock/Unlock、RLock/RUnlock方法了这和我们前面在01讲中介绍Mutex的使用方法很类似你可以回去复习一下。

RWMutex的实现原理

RWMutex是很常见的并发原语很多编程语言的库都提供了类似的并发类型。RWMutex一般都是基于互斥锁、条件变量condition variables或者信号量semaphores等并发原语来实现。Go标准库中的RWMutex是基于Mutex实现的。

readers-writers问题一般有三类基于对读和写操作的优先级读写锁的设计和实现也分成三类。

  • Read-preferring:读优先的设计可以提供很高的并发性,但是,在竞争激烈的情况下可能会导致写饥饿。这是因为,如果有大量的读,这种设计会导致只有所有的读都释放了锁之后,写才可能获取到锁。
  • Write-preferring写优先的设计意味着如果已经有一个writer在等待请求锁的话它会阻止新来的请求锁的reader获取到锁所以优先保障writer。当然如果有一些reader已经请求了锁的话新请求的writer也会等待已经存在的reader都释放锁之后才能获取。所以写优先级设计中的优先权是针对新来的请求而言的。这种设计主要避免了writer的饥饿问题。
  • 不指定优先级这种设计比较简单不区分reader和writer优先级某些场景下这种不指定优先级的设计反而更有效因为第一类优先级会导致写饥饿第二类优先级可能会导致读饥饿这种不指定优先级的访问不再区分读写大家都是同一个优先级解决了饥饿的问题。

Go标准库中的RWMutex设计是Write-preferring方案。一个正在阻塞的Lock调用会排除新的reader请求到锁。

RWMutex包含一个Mutex以及四个辅助字段writerSem、readerSem、readerCount和readerWait

type RWMutex struct {
	w           Mutex   // 互斥锁解决多个writer的竞争
	writerSem   uint32  // writer信号量
	readerSem   uint32  // reader信号量
	readerCount int32   // reader的数量
	readerWait  int32   // writer等待完成的reader的数量
}

const rwmutexMaxReaders = 1 << 30

我来简单解释一下这几个字段。

  • 字段w为writer的竞争锁而设计
  • 字段readerCount记录当前reader的数量以及是否有writer竞争锁
  • readerWait记录writer请求锁时需要等待read完成的reader的数量
  • writerSem 和readerSem都是为了阻塞设计的信号量。

这里的常量rwmutexMaxReaders定义了最大的reader数量。

好了知道了RWMutex的设计方案和具体字段下面我来解释一下具体的方法实现。

RLock/RUnlock的实现

首先我们看一下移除了race等无关紧要的代码后的RLock和RUnlock方法

func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
            // rw.readerCount是负值的时候意味着此时有writer等待请求锁因为writer优先级高所以把后来的reader阻塞休眠
        runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
}
func (rw *RWMutex) RUnlock() {
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        rw.rUnlockSlow(r) // 有等待的writer
    }
}
func (rw *RWMutex) rUnlockSlow(r int32) {
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {
        // 最后一个reader了writer终于有机会获得锁了
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

第2行是对reader计数加1。你可能比较困惑的是readerCount怎么还可能为负数呢其实这是因为readerCount这个字段有双重含义

  • 没有writer竞争或持有锁时readerCount和我们正常理解的reader的计数是一样的
  • 但是如果有writer竞争锁或者持有锁时那么readerCount不仅仅承担着reader的计数功能还能够标识当前是否有writer竞争或持有锁在这种情况下请求锁的reader的处理进入第4行阻塞等待锁的释放。

调用RUnlock的时候我们需要将Reader的计数减去1第8行因为reader的数量减少了一个。但是第8行的AddInt32的返回值还有另外一个含义。如果它是负值就表示当前有writer竞争锁在这种情况下还会调用rUnlockSlow方法检查是不是reader都释放读锁了如果读锁都释放了那么可以唤醒请求写锁的writer了。

当一个或者多个reader持有锁的时候竞争锁的writer会等待这些reader释放完才可能持有这把锁。打个比方在房地产行业中有条规矩叫做“买卖不破租赁意思是说就算房东把房子卖了新业主也不能把当前的租户赶走而是要等到租约结束后才能接管房子。这和RWMutex的设计是一样的。当writer请求锁的时候是无法改变既有的reader持有锁的现实的也不会强制这些reader释放锁它的优先权只是限定后来的reader不要和它抢。

所以rUnlockSlow将持有锁的reader计数减少1的时候会检查既有的reader是不是都已经释放了锁如果都释放了锁就会唤醒writer让writer持有锁。

Lock

RWMutex是一个多writer多reader的读写锁所以同时可能有多个writer和reader。那么为了避免writer之间的竞争RWMutex就会使用一个Mutex来保证writer的互斥。

一旦一个writer获得了内部的互斥锁就会反转readerCount字段把它从原来的正整数readerCount(>=0)修改为负数readerCount-rwmutexMaxReaders让这个字段保持两个含义既保存了reader的数量又表示当前有writer

我们来看下下面的代码。第5行还会记录当前活跃的reader数量所谓活跃的reader就是指持有读锁还没有释放的那些reader。

func (rw *RWMutex) Lock() {
    // 首先解决其他writer竞争问题
    rw.w.Lock()
    // 反转readerCount告诉reader有writer竞争锁
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    // 如果当前有reader持有锁那么需要等待
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
}

如果readerCount不是0就说明当前有持有读锁的readerRWMutex需要把这个当前readerCount赋值给readerWait字段保存下来第7行 同时这个writer进入阻塞等待状态第8行

每当一个reader释放读锁的时候调用RUnlock方法时readerWait字段就减1直到所有的活跃的reader都释放了读锁才会唤醒这个writer。

Unlock

当一个writer释放锁的时候它会再次反转readerCount字段。可以肯定的是因为当前锁由writer持有所以readerCount字段是反转过的并且减去了rwmutexMaxReaders这个常数变成了负数。所以这里的反转方法就是给它增加rwmutexMaxReaders这个常数值。

既然writer要释放锁了那么就需要唤醒之后新来的reader不必再阻塞它们了让它们开开心心地继续执行就好了。

在RWMutex的Unlock返回之前需要把内部的互斥锁释放。释放完毕后其他的writer才可以继续竞争这把锁。

func (rw *RWMutex) Unlock() {
    // 告诉reader没有活跃的writer了
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    
    // 唤醒阻塞的reader们
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    // 释放内部的互斥锁
    rw.w.Unlock()
}

在这段代码中我删除了race的处理和异常情况的检查总体看来还是比较简单的。这里有几个重点我要再提醒你一下。首先你要理解readerCount这个字段的含义以及反转方式。其次你还要注意字段的更改和内部互斥锁的顺序关系。在Lock方法中是先获取内部互斥锁才会修改的其他字段而在Unlock方法中是先修改的其他字段才会释放内部互斥锁这样才能保证字段的修改也受到互斥锁的保护。

好了到这里我们就完整学习了RWMutex的概念和实现原理。RWMutex的应用场景非常明确就是解决readers-writers问题。学完了今天的内容之后当你遇到这类问题时要优先想到RWMutex。另外Go并发原语代码实现的质量都很高非常精炼和高效所以你可以通过看它们的实现原理学习一些编程的技巧。当然还有非常重要的一点就是要知道reader或者writer请求锁的时候既有的reader/writer和后续请求锁的reader/writer之间的释放锁/请求锁)顺序关系。

有个很有意思的事儿就是官方的文档对RWMutex介绍是错误的或者说是不明确的在下一个版本Go 1.16官方会更改对RWMutex的介绍具体是这样的

A RWMutex is a reader/writer mutual exclusion lock.

The lock can be held by any number of readers or a single writer, and

a blocked writer also blocks new readers from acquiring the lock.

这个描述是相当精确的它指出了RWMutex可以被谁持有以及writer比后续的reader有获取锁的优先级。

虽然RWMutex暴露的API也很简单使用起来也没有复杂的逻辑但是和Mutex一样在实际使用的时候也会很容易踩到一些坑。接下来我给你重点介绍3个常见的踩坑点。

RWMutex的3个踩坑点

坑点1不可复制

前面刚刚说过RWMutex是由一个互斥锁和四个辅助字段组成的。我们很容易想到互斥锁是不可复制的再加上四个有状态的字段RWMutex就更加不能复制使用了。

不能复制的原因和互斥锁一样。一旦读写锁被使用,它的字段就会记录它当前的一些状态。这个时候你去复制这把锁,就会把它的状态也给复制过来。但是,原来的锁在释放的时候,并不会修改你复制出来的这个读写锁,这就会导致复制出来的读写锁的状态不对,可能永远无法释放锁。

那该怎么办呢其实解决方案也和互斥锁一样。你可以借助vet工具在变量赋值、函数传参、函数返回值、遍历数据、struct初始化等时检查是否有读写锁隐式复制的情景。

坑点2重入导致死锁

读写锁因为重入(或递归调用)导致死锁的情况更多。

我先介绍第一种情况。因为读写锁内部基于互斥锁实现对writer的并发访问而互斥锁本身是有重入问题的所以writer重入调用Lock的时候就会出现死锁的现象这个问题我们在学习互斥锁的时候已经了解过了。

func foo(l *sync.RWMutex) {
    fmt.Println("in foo")
    l.Lock()
    bar(l)
    l.Unlock()
}

func bar(l *sync.RWMutex) {
    l.Lock()
    fmt.Println("in bar")
    l.Unlock()
}

func main() {
    l := &sync.RWMutex{}
    foo(l)
}

运行这个程序你就会得到死锁的错误输出在Go运行的时候很容易就能检测出来。

第二种死锁的场景有点隐蔽。我们知道有活跃reader的时候writer会等待如果我们在reader的读操作时调用writer的写操作它会调用Lock方法那么这个reader和writer就会形成互相依赖的死锁状态。Reader想等待writer完成后再释放锁而writer需要这个reader释放锁之后才能不阻塞地继续执行。这是一个读写锁常见的死锁场景。

第三种死锁的场景更加隐蔽。

当一个writer请求锁的时候如果已经有一些活跃的reader它会等待这些活跃的reader完成才有可能获取到锁但是如果之后活跃的reader再依赖新的reader的话这些新的reader就会等待writer释放锁之后才能继续执行这就形成了一个环形依赖 writer依赖活跃的reader -> 活跃的reader依赖新来的reader -> 新来的reader依赖writer

这个死锁相当隐蔽原因在于它和RWMutex的设计和实现有关。啥意思呢我们来看一个计算阶乘(n!)的例子:

func main() {
    var mu sync.RWMutex

    // writer,稍微等待然后制造一个调用Lock的场景
    go func() {
        time.Sleep(200 * time.Millisecond)
        mu.Lock()
        fmt.Println("Lock")
        time.Sleep(100 * time.Millisecond)
        mu.Unlock()
        fmt.Println("Unlock")
    }()

    go func() {
        factorial(&mu, 10) // 计算10的阶乘, 10!
    }()
    
    select {}
}

// 递归调用计算阶乘
func factorial(m *sync.RWMutex, n int) int {
    if n < 1 { // 阶乘退出条件 
        return 0
    }
    fmt.Println("RLock")
    m.RLock()
    defer func() {
        fmt.Println("RUnlock")
        m.RUnlock()
    }()
    time.Sleep(100 * time.Millisecond)
    return factorial(m, n-1) * n // 递归调用
}

factoria方法是一个递归计算阶乘的方法我们用它来模拟reader。为了更容易地制造出死锁场景我在这里加上了sleep的调用延缓逻辑的执行。这个方法会调用读锁第27行在第33行递归地调用此方法每次调用都会产生一次读锁的调用所以可以不断地产生读锁的调用而且必须等到新请求的读锁释放这个读锁才能释放。

同时我们使用另一个goroutine去调用Lock方法来实现writer这个writer会等待200毫秒后才会调用Lock这样在调用Lock的时候factoria方法还在执行中不断调用RLock。

这两个goroutine互相持有锁并等待谁也不会退让一步满足了“writer依赖活跃的reader -> 活跃的reader依赖新来的reader -> 新来的reader依赖writer”的死锁条件所以就导致了死锁的产生。

所以,使用读写锁最需要注意的一点就是尽量避免重入,重入带来的死锁非常隐蔽,而且难以诊断。

坑点3释放未加锁的RWMutex

和互斥锁一样Lock和Unlock的调用总是成对出现的RLock和RUnlock的调用也必须成对出现。Lock和RLock多余的调用会导致锁没有被释放可能会出现死锁而Unlock和RUnlock多余的调用会导致panic。在生产环境中出现panic是大忌你总不希望半夜爬起来处理生产环境程序崩溃的问题吧所以在使用读写锁的时候一定要注意不遗漏不多余

流行的Go开发项目中的坑

好了又到了泡一杯宁夏枸杞加新疆大滩枣的养生茶静静地欣赏知名项目出现Bug的时候了这次被拉出来的是RWMutex的Bug。

Docker

issue 36840

issue 36840修复的是错误地把writer当成reader的Bug。 这个地方本来需要修改数据需要调用的是写锁结果用的却是读锁。或许是被它紧挨着的findNode方法调用迷惑了认为这只是一个读操作。可实际上代码后面还会有changeNodeState方法的调用这是一个写操作。修复办法也很简单只需要改成Lock/Unlock即可。

Kubernetes

issue 62464

issue 62464就是读写锁第二种死锁的场景这是一个典型的reader导致的死锁的例子。知道墨菲定律吧“凡是可能出错的事必定会出错”。你可能觉得我前面讲的RWMutex的坑绝对不会被人踩的因为道理大家都懂但是你看Kubernetes就踩了这个重入的坑。

这个issue在移除pod的时候可能会发生原因就在于GetCPUSetOrDefault方法会请求读锁同时它还会调用GetCPUSet或GetDefaultCPUSet方法。当这两个方法都请求写锁时是获取不到的因为GetCPUSetOrDefault方法还没有执行完不会释放读锁这就形成了死锁。

总结

在开发过程中一开始考虑共享资源并发访问问题的时候我们就会想到互斥锁Mutex。因为刚开始的时候我们还并不太了解并发的情况所以就会使用最简单的同步原语来解决问题。等到系统成熟真正到了需要性能优化的时候我们就能静下心来分析并发场景的可能性这个时候我们就要考虑将Mutex修改为RWMutex来压榨系统的性能。

当然如果一开始你的场景就非常明确了比如我就要实现一个线程安全的map那么一开始你就可以考虑使用读写锁。

正如我在前面提到的如果你能意识到你要解决的问题是一个readers-writers问题那么你就可以毫不犹豫地选择RWMutex不用考虑其它选择。那在使用RWMutex时最需要注意的一点就是尽量避免重入重入带来的死锁非常隐蔽而且难以诊断。

另外我们也可以扩展RWMutex不过实现方法和互斥锁Mutex差不多在技术上是一样的都是通过unsafe来实现我就不再具体讲了。课下你可以参照我们上节课学习的方法实现一个扩展的RWMutex。

这一讲我们系统学习了读写锁的相关知识,这里提供给你一个知识地图,帮助你复习本节课的知识。

思考题

请你写一个扩展的读写锁比如提供TryLock查询当前是否有writer、reader的数量等方法。

欢迎在留言区写下你的思考和答案,我们一起交流讨论。如果你觉得有所收获,也欢迎你把今天的内容分享给你的朋友或同事。