You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

21 KiB

12 | atomic要保证原子操作一定要使用这几种方法

你好,我是鸟窝。

前面我们在学习Mutex、RWMutex等并发原语的实现时你可以看到最底层是通过atomic包中的一些原子操作来实现的。当时为了让你的注意力集中在这些原语的功能实现上我并没有展开介绍这些原子操作是干什么用的。

你可能会说,这些并发原语已经可以应对大多数的并发场景了,为啥还要学习原子操作呢?其实,这是因为,在很多场景中,使用并发原语实现起来比较复杂,而原子操作可以帮助我们更轻松地实现底层的优化。

所以现在我会专门用一节课带你仔细地了解一下什么是原子操作atomic包都提供了哪些实现原子操作的方法。另外我还会带你实现一个基于原子操作的数据结构。好了接下来我们先来学习下什么是原子操作。

原子操作的基础知识

Package sync/atomic 实现了同步算法底层的原子的内存操作原语,我们把它叫做原子操作原语,它提供了一些实现原子操作的方法。

之所以叫原子操作,是因为一个原子在执行的时候,其它线程不会看到执行一半的操作结果。在其它线程看来,原子操作要么执行完了,要么还没有执行,就像一个最小的粒子-原子一样,不可分割。

CPU提供了基础的原子操作不过不同架构的系统的原子操作是不一样的。

对于单处理器单核系统来说如果一个操作是由一个CPU指令来实现的那么它就是原子操作比如它的XCHG和INC等指令。如果操作是基于多条指令来实现的那么执行的过程中可能会被中断并执行上下文切换这样的话原子性的保证就被打破了因为这个时候操作可能只执行了一半。

在多处理器多核系统中,原子操作的实现就比较复杂了。

由于cache的存在单个核上的单个指令进行原子操作的时候你要确保其它处理器或者核不访问此原子操作的地址或者是确保其它处理器或者核总是访问原子操作之后的最新的值。x86架构中提供了指令前缀LOCKLOCK保证了指令比如LOCK CMPXCHG op1、op2不会受其它处理器或CPU核的影响有些指令比如XCHG本身就提供Lock的机制。不同的CPU架构提供的原子操作指令的方式也是不同的比如对于多核的MIPS和ARM提供了LL/SCLoad Link/Store Conditional指令可以帮助实现原子操作ARMLL/SC指令 LDREX和STREX

因为不同的CPU架构甚至不同的版本提供的原子操作的指令是不同的所以要用一种编程语言实现支持不同架构的原子操作是相当有难度的。不过还好这些都不需要你操心因为Go提供了一个通用的原子操作的API将更底层的不同的架构下的实现封装成atomic包提供了修改类型的原子操作atomic read-modify-writeRMW和加载存储类型的原子操作Load和Store的API稍后我会一一介绍。

有的代码也会因为架构的不同而不同。有时看起来貌似一个操作是原子操作但实际上对于不同的架构来说情况是不一样的。比如下面的代码的第4行是将一个64位的值赋值给变量i

const x int64 = 1 + 1<<33

func main() {
    var i = x
    _ = i
}

如果你使用GOARCH=386的架构去编译这段代码那么第5行其实是被拆成了两个指令分别操作低32位和高32位使用 GOARCH=386 go tool compile -N -l test.goGOARCH=386 go tool objdump -gnu test.o反编译试试

如果GOARCH=amd64的架构去编译这段代码那么第5行其中的赋值操作其实是一条指令

所以如果要想保证原子操作切记一定要使用atomic提供的方法。

好了了解了什么是原子操作以及不同系统的不同原子操作接下来我来介绍下atomic原子操作的应用场景。

atomic原子操作的应用场景

开篇我说过使用atomic的一些方法我们可以实现更底层的一些优化。如果使用Mutex等并发原语进行这些优化虽然可以解决问题但是这些并发原语的实现逻辑比较复杂对性能还是有一定的影响的。

举个例子假设你想在程序中使用一个标志flag比如一个bool类型的变量来标识一个定时任务是否已经启动执行了你会怎么做呢

我们先来看看加锁的方法。如果使用Mutex和RWMutex在读取和设置这个标志的时候加锁是可以做到互斥的、保证同一时刻只有一个定时任务在执行的所以使用Mutex或者RWMutex是一种解决方案。

其实这个场景中的问题不涉及到对资源复杂的竞争逻辑只是会并发地读写这个标志这类场景就适合使用atomic的原子操作。具体怎么做呢你可以使用一个uint32类型的变量如果这个变量的值是0就标识没有任务在执行如果它的值是1就标识已经有任务在完成了。你看是不是很简单呢

再来看一个例子。假设你在开发应用程序的时候需要从配置服务器中读取一个节点的配置信息。而且在这个节点的配置发生变更的时候你需要重新从配置服务器中拉取一份新的配置并更新。你的程序中可能有多个goroutine都依赖这份配置涉及到对这个配置对象的并发读写你可以使用读写锁实现对配置对象的保护。在大部分情况下你也可以利用atomic实现配置对象的更新和加载。

分析到这里可以看到这两个例子都可以使用基本并发原语来实现的只不过我们不需要这些基本并发原语里面的复杂逻辑而是只需要其中的简单原子操作所以这些场景可以直接使用atomic包中的方法去实现。

有时候你也可以使用atomic实现自己定义的基本并发原语比如Go issue有人提议的CondMutex、Mutex.LockContext、WaitGroup.Go等我们可以使用atomic或者基于它的更高一级的并发原语去实现。我先前讲的几种基本并发原语的底层比如Mutex就是基于通过atomic的方法实现的。

除此之外atomic原子操作还是实现lock-free数据结构的基石。

在实现lock-free的数据结构时我们可以不使用互斥锁这样就不会让线程因为等待互斥锁而阻塞休眠而是让线程保持继续处理的状态。另外不使用互斥锁的话lock-free的数据结构还可以提供并发的性能。

不过lock-free的数据结构实现起来比较复杂需要考虑的东西很多有兴趣的同学可以看一位微软专家写的一篇经验分享Lockless Programming Considerations for Xbox 360 and Microsoft Windows这里我们不细谈了。不过这节课的最后我会带你开发一个lock-free的queue来学习下使用atomic操作实现lock-free数据结构的方法你可以拿它和使用互斥锁实现的queue做性能对比看看在性能上是否有所提升。

看到这里你是不是觉得atomic非常重要呢不过要想能够灵活地应用atomic我们首先得知道atomic提供的所有方法。

atomic提供的方法

目前的Go的泛型的特性还没有发布Go的标准库中的很多实现会显得非常啰嗦多个类型会实现很多类似的方法尤其是atomic包最为明显。相信泛型支持之后atomic的API会清爽很多。

atomic为了支持int32、int64、uint32、uint64、uintptr、PointerAdd方法不支持类型分别提供了AddXXX、CompareAndSwapXXX、SwapXXX、LoadXXX、StoreXXX等方法。不过你也不要担心你只要记住了一种数据类型的方法的意义其它数据类型的方法也是一样的。

关于atomic还有一个地方你一定要记住atomic操作的对象是一个地址你需要把可寻址的变量的地址作为参数传递给方法而不是把变量的值传递给方法

好了下面我就来给你介绍一下atomic提供的方法。掌握了这些你就可以说完全掌握了atomic包。

Add

首先我们来看Add方法的签名

其实Add方法就是给第一个参数地址中的值增加一个delta值。

对于有符号的整数来说delta可以是一个负数相当于减去一个值。对于无符号的整数和uinptr类型来说怎么实现减去一个值呢毕竟atomic并没有提供单独的减法操作。

我来跟你说一种方法。你可以利用计算机补码的规则把减法变成加法。以uint32类型为例

AddUint32(&x, ^uint32(c-1)).

如果是对uint64的值进行操作那么就把上面的代码中的uint32替换成uint64。

尤其是减1这种特殊的操作我们可以简化为

AddUint32(&x, ^uint32(0))

好了我们再来看看CAS方法。

CAS CompareAndSwap

以int32为例我们学习一下CAS提供的功能。在CAS的方法签名中需要提供要操作的地址、原数据值、新值如下所示

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

我们来看下这个方法的功能。

这个方法会比较当前addr地址里的值是不是old如果不等于old就返回false如果等于old就把此地址的值替换成new值返回true。这就相当于“判断相等才替换”。

如果使用伪代码来表示这个原子操作,代码如下:

if *addr == old {
	*addr = new
	return true
}
return false

它支持的类型和方法如图所示:

Swap

如果不需要比较旧值只是比较粗暴地替换的话就可以使用Swap方法它替换后还可以返回旧值伪代码如下

old = *addr
*addr = new
return old

它支持的数据类型和方法如图所示:

Load

Load方法会取出addr地址中的值即使在多处理器、多核、有CPU cache的情况下这个操作也能保证Load是一个原子操作。

它支持的数据类型和方法如图所示:

Store

Store方法会把一个值存入到指定的addr地址中即使在多处理器、多核、有CPU cache的情况下这个操作也能保证Store是一个原子操作。别的goroutine通过Load读取出来不会看到存取了一半的值。

它支持的数据类型和方法如图所示:

Value类型

刚刚说的都是一些比较常见的类型其实atomic还提供了一个特殊的类型Value。它可以原子地存取对象类型但也只能存取不能CAS和Swap常常用在配置变更等场景中。

接下来我以一个配置变更的例子来演示Value类型的使用。这里定义了一个Value类型的变量config 用来存储配置信息。

首先我们启动一个goroutine然后让它随机sleep一段时间之后就变更一下配置并通过我们前面学到的Cond并发原语通知其它的reader去加载新的配置。

接下来我们启动一个goroutine等待配置变更的信号一旦有变更它就会加载最新的配置。

通过这个例子你可以了解到Value的Store/Load方法的使用因为它只有这两个方法只要掌握了它们的使用你就完全掌握了Value类型。

type Config struct {
    NodeName string
    Addr     string
    Count    int32
}

func loadNewConfig() Config {
    return Config{
        NodeName: "北京",
        Addr:     "10.77.95.27",
        Count:    rand.Int31(),
    }
}
func main() {
    var config atomic.Value
    config.Store(loadNewConfig())
    var cond = sync.NewCond(&sync.Mutex{})

    // 设置新的config
    go func() {
        for {
            time.Sleep(time.Duration(5+rand.Int63n(5)) * time.Second)
            config.Store(loadNewConfig())
            cond.Broadcast() // 通知等待着配置已变更
        }
    }()

    go func() {
        for {
            cond.L.Lock()
            cond.Wait()                 // 等待变更信号
            c := config.Load().(Config) // 读取新的配置
            fmt.Printf("new config: %+v\n", c)
            cond.L.Unlock()
        }
    }()

    select {}
}

好了关于标准库的atomic提供的方法到这里我们就学完了。事实上atomic包提供了非常好的支持各种平台的一致性的API绝大部分项目都是直接使用它。接下来我再给你介绍一下第三方库帮助你稍微开拓一下思维。

第三方库的扩展

其实atomic的API已经算是很简单的了它提供了包一级的函数可以对几种类型的数据执行原子操作。

不过有一点让人觉得不爽的是或者是让熟悉面向对象编程的程序员不爽的是函数调用有一点点麻烦。所以有些人就对这些函数做了进一步的包装跟atomic中的Value类型类似这些类型也提供了面向对象的使用方式比如关注度比较高的uber-go/atomic它定义和封装了几种与常见类型相对应的原子操作类型这些类型提供了原子操作的方法。这些类型包括Bool、Duration、Error、Float64、Int32、Int64、String、Uint32、Uint64等。

比如Bool类型提供了CAS、Store、Swap、Toggle等原子方法还提供String、MarshalJSON、UnmarshalJSON等辅助方法确实是一个精心设计的atomic扩展库。关于这些方法你一看名字就能猜出来它们的功能我就不多说了。

其它的数据类型也和Bool类型相似使用起来就像面向对象的编程一样你可以看下下面的这段代码。

    var running atomic.Bool
    running.Store(true)
    running.Toggle()
    fmt.Println(running.Load()) // false

使用atomic实现Lock-Free queue

atomic常常用来实现Lock-Free的数据结构这次我会给你展示一个Lock-Free queue的实现。

Lock-Free queue最出名的就是 Maged M. Michael 和 Michael L. Scott 1996年发表的论文中的算法算法比较简单容易实现伪代码的每一行都提供了注释我就不在这里贴出伪代码了因为我们使用Go实现这个数据结构的代码几乎和伪代码一样

package queue
import (
	"sync/atomic"
	"unsafe"
)
// lock-free的queue
type LKQueue struct {
	head unsafe.Pointer
	tail unsafe.Pointer
}
// 通过链表实现,这个数据结构代表链表中的节点
type node struct {
	value interface{}
	next  unsafe.Pointer
}
func NewLKQueue() *LKQueue {
	n := unsafe.Pointer(&node{})
	return &LKQueue{head: n, tail: n}
}
// 入队
func (q *LKQueue) Enqueue(v interface{}) {
	n := &node{value: v}
	for {
		tail := load(&q.tail)
		next := load(&tail.next)
		if tail == load(&q.tail) { // 尾还是尾
			if next == nil { // 还没有新数据入队
				if cas(&tail.next, next, n) { //增加到队尾
					cas(&q.tail, tail, n) //入队成功,移动尾巴指针
					return
				}
			} else { // 已有新数据加到队列后面,需要移动尾指针
				cas(&q.tail, tail, next)
			}
		}
	}
}
// 出队没有元素则返回nil
func (q *LKQueue) Dequeue() interface{} {
	for {
		head := load(&q.head)
		tail := load(&q.tail)
		next := load(&head.next)
		if head == load(&q.head) { // head还是那个head
			if head == tail { // head和tail一样
				if next == nil { // 说明是空队列
					return nil
				}
				// 只是尾指针还没有调整,尝试调整它指向下一个
				cas(&q.tail, tail, next)
			} else {
				// 读取出队的数据
				v := next.value
                // 既然要出队了,头指针移动到下一个
				if cas(&q.head, head, next) {
					return v // Dequeue is done.  return
				}
			}
		}
	}
}

// 将unsafe.Pointer原子加载转换成node
func load(p *unsafe.Pointer) (n *node) {
	return (*node)(atomic.LoadPointer(p))
}

// 封装CAS,避免直接将*node转换成unsafe.Pointer
func cas(p *unsafe.Pointer, old, new *node) (ok bool) {
	return atomic.CompareAndSwapPointer(
		p, unsafe.Pointer(old), unsafe.Pointer(new))
}

我来给你介绍下这里的主要逻辑。

这个lock-free的实现使用了一个辅助头指针head头指针不包含有意义的数据只是一个辅助的节点这样的话出队入队中的节点会更简单。

入队的时候通过CAS操作将一个元素添加到队尾并且移动尾指针。

出队的时候移除一个节点并通过CAS操作移动head指针同时在必要的时候移动尾指针。

总结

好了我们来小结一下。这节课我们学习了atomic的基本使用方法以及它提供的几种方法包括Add、CAS、Swap、Load、Store、Value类型。除此之外我还介绍了一些第三方库并且带你实现了Lock-free queue。到这里相信你已经掌握了atomic提供的各种方法并且能够应用到实践中了。

最后,我还想和你讨论一个额外的问题:对一个地址的赋值是原子操作吗?

这是一个很有趣的问题如果是原子操作还要atomic包干什么官方的文档中并没有特意的介绍不过在一些issue或者论坛中每当有人谈到这个问题时总是会被建议用atomic包。

Dave Cheney就谈到过这个问题讲得非常好。我来给你总结一下他讲的知识点这样你就比较容易理解使用atomic和直接内存操作的区别了。

在现在的系统中write的地址基本上都是对齐的aligned。 比如32位的操作系统、CPU以及编译器write的地址总是4的倍数64位的系统总是8的倍数还记得WaitGroup针对64位系统和32位系统对state1的字段不同的处理吗。对齐地址的写不会导致其他人看到只写了一半的数据因为它通过一个指令就可以实现对地址的操作。如果地址不是对齐的话那么处理器就需要分成两个指令去处理如果执行了一个指令其它人就会看到更新了一半的错误的数据这被称做撕裂写torn write 。所以,你可以认为赋值操作是一个原子操作,这个“原子操作”可以认为是保证数据的完整性。

但是对于现代的多处理多核的系统来说由于cache、指令重排可见性等问题我们对原子操作的意义有了更多的追求。在多核系统中一个核对地址的值的更改在更新到主内存中之前是在多级缓存中存放的。这时多个核看到的数据可能是不一样的其它的核可能还没有看到更新的数据还在使用旧的数据。

多处理器多核心系统为了处理这类问题使用了一种叫做内存屏障memory fence或memory barrier的方式。一个写内存屏障会告诉处理器必须要等到它管道中的未完成的操作特别是写操作都被刷新到内存中再进行操作。此操作还会让相关的处理器的CPU缓存失效以便让它们从主存中拉取最新的值。

atomic包提供的方法会提供内存屏障的功能所以atomic不仅仅可以保证赋值的数据完整性还能保证数据的可见性一旦一个核更新了该地址的值其它处理器总是能读取到它的最新值。但是需要注意的是因为需要处理器之间保证数据的一致性atomic的操作也是会降低性能的。

思考题

atomic.Value只有Load/Store方法你是不是感觉意犹未尽你可以尝试为Value类型增加 Swap和CompareAndSwap方法可以参考一下这份资料)。

欢迎在留言区写下你的思考和答案,我们一起交流讨论。如果你觉得有所收获,也欢迎你把今天的内容分享给你的朋友或同事。