You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

19 KiB

08 | Once一个简约而不简单的并发原语

你好,我是鸟窝。

这一讲我来讲一个简单的并发原语Once。为什么要学习Once呢我先给你答案Once可以用来执行且仅仅执行一次动作常常用于单例对象的初始化场景。

那这节课,我们就从对单例对象进行初始化这件事儿说起。

初始化单例资源有很多方法比如定义package级别的变量这样程序在启动的时候就可以初始化

package abc

import time

var startTime = time.Now()

或者在init函数中进行初始化

package abc

var startTime time.Time

func init() {
  startTime = time.Now()
}


又或者在main函数开始执行的时候执行一个初始化的函数

package abc

var startTime time.Tim

func initApp() {
    startTime = time.Now()
}
func main() {
  initApp()
}

这三种方法都是线程安全的,并且后两种方法还可以根据传入的参数实现定制化的初始化操作。

但是很多时候我们是要延迟进行初始化的,所以有时候单例资源的初始化,我们会使用下面的方法:

package main

import (
    "net"
    "sync"
    "time"
)

// 使用互斥锁保证线程(goroutine)安全
var connMu sync.Mutex
var conn net.Conn

func getConn() net.Conn {
    connMu.Lock()
    defer connMu.Unlock()

    // 返回已创建好的连接
    if conn != nil {
        return conn
    }

    // 创建连接
    conn, _ = net.DialTimeout("tcp", "baidu.com:80", 10*time.Second)
    return conn
}

// 使用连接
func main() {
    conn := getConn()
    if conn == nil {
        panic("conn is nil")
    }
}

这种方式虽然实现起来简单,但是有性能问题。一旦连接创建好,每次请求的时候还是得竞争锁才能读取到这个连接,这是比较浪费资源的,因为连接如果创建好之后,其实就不需要锁的保护了。怎么办呢?

这个时候就可以使用这一讲要介绍的Once并发原语了。接下来我会详细介绍Once的使用、实现和易错场景。

Once的使用场景

sync.Once只暴露了一个方法Do你可以多次调用Do方法但是只有第一次调用Do方法时f参数才会执行这里的f是一个无参数无返回值的函数。

func (o *Once) Do(f func())

因为当且仅当第一次调用Do方法的时候参数f才会执行即使第二次、第三次、第n次调用时f参数的值不一样也不会被执行比如下面的例子虽然f1和f2是不同的函数但是第二个函数f2就不会执行。

package main


import (
    "fmt"
    "sync"
)

func main() {
    var once sync.Once

    // 第一个初始化函数
    f1 := func() {
        fmt.Println("in f1")
    }
    once.Do(f1) // 打印出 in f1

    // 第二个初始化函数
    f2 := func() {
        fmt.Println("in f2")
    }
    once.Do(f2) // 无输出
}

因为这里的f参数是一个无参数无返回的函数所以你可能会通过闭包的方式引用外面的参数比如

    var addr = "baidu.com"

    var conn net.Conn
    var err error

    once.Do(func() {
        conn, err = net.Dial("tcp", addr)
    })

而且在实际的使用中,绝大多数情况下,你会使用闭包的方式去初始化外部的一个资源。

你看Once的使用场景很明确所以在标准库内部实现中也常常能看到Once的身影。

比如标准库内部cache的实现上就使用了Once初始化Cache资源包括defaultDir值的获取

    func Default() *Cache { // 获取默认的Cache
		defaultOnce.Do(initDefaultCache) // 初始化cache
		return defaultCache
	}
	
    // 定义一个全局的cache变量使用Once初始化所以也定义了一个Once变量
	var (
		defaultOnce  sync.Once
		defaultCache *Cache
	)

    func initDefaultCache() { //初始化cache,也就是Once.Do使用的f函数
		......
		defaultCache = c
	}

    // 其它一些Once初始化的变量比如defaultDir
    var (
		defaultDirOnce sync.Once
		defaultDir     string
		defaultDirErr  error
	)



还有一些测试的时候初始化测试的资源(export_windows_test

    // 测试window系统调用时区相关函数
    func ForceAusFromTZIForTesting() {
		ResetLocalOnceForTest()
        // 使用Once执行一次初始化
		localOnce.Do(func() { initLocalFromTZI(&aus) })
	}

除此之外还有保证只调用一次copyenv的envOncestrings包下的Replacertime包中的测试Go拉取库时的proxynet.pipecrc64Regexp……数不胜数。我给你重点介绍一下很值得我们学习的 math/big/sqrt.go中实现的一个数据结构它通过Once封装了一个只初始化一次的值

   // 值是3.0或者0.0的一个数据结构
   var threeOnce struct {
		sync.Once
		v *Float
	}
	
    // 返回此数据结构的值如果还没有初始化为3.0,则初始化
	func three() *Float {
		threeOnce.Do(func() { // 使用Once初始化
			threeOnce.v = NewFloat(3.0)
		})
		return threeOnce.v
	}

它将sync.Once和*Float封装成一个对象提供了只初始化一次的值v。 你看它的three方法的实现虽然每次都调用threeOnce.Do方法但是参数只会被调用一次。

当你使用Once的时候你也可以尝试采用这种结构将值和Once封装成一个新的数据结构提供只初始化一次的值。

总结一下Once并发原语解决的问题和使用场景Once常常用来初始化单例资源或者并发访问只需初始化一次的共享资源或者在测试的时候初始化一次测试资源

了解了Once的使用场景那应该怎样实现一个Once呢

如何实现一个Once

很多人认为实现一个Once一样的并发原语很简单只需使用一个flag标记是否初始化过即可最多是用atomic原子操作这个flag比如下面的实现

type Once struct {
    done uint32
}

func (o *Once) Do(f func()) {
    if !atomic.CompareAndSwapUint32(&o.done, 0, 1) {
        return
    }
    f()
}

这确实是一种实现方式但是这个实现有一个很大的问题就是如果参数f执行很慢的话后续调用Do方法的goroutine虽然看到done已经设置为执行过了但是获取某些初始化资源的时候可能会得到空的资源因为f还没有执行完。

所以,一个正确的Once实现要使用一个互斥锁这样初始化的时候如果有并发的goroutine就会进入doSlow方法。互斥锁的机制保证只有一个goroutine进行初始化同时利用双检查的机制double-checking再次判断o.done是否为0如果为0则是第一次执行执行完毕后就将o.done设置为1然后释放锁。

即使此时有多个goroutine同时进入了doSlow方法因为双检查的机制后续的goroutine会看到o.done的值为1也不会再次执行f。

这样既保证了并发的goroutine会等待f完成而且还不会多次执行f。

type Once struct {
    done uint32
    m    Mutex
}

func (o *Once) Do(f func()) {
    if atomic.LoadUint32(&o.done) == 0 {
        o.doSlow(f)
    }
}


func (o *Once) doSlow(f func()) {
    o.m.Lock()
    defer o.m.Unlock()
    // 双检查
    if o.done == 0 {
        defer atomic.StoreUint32(&o.done, 1)
        f()
    }
}

好了到这里我们就了解了Once的使用场景很明确同时呢也感受到Once的实现也是相对简单的。在实践中其实很少会出现错误使用Once的情况但是就像墨菲定律说的凡是可能出错的事就一定会出错。使用Once也有可能出现两种错误场景尽管非常罕见。我这里提前讲给你咱打个预防针。

使用Once可能出现的2种错误

第一种错误:死锁

你已经知道了Do方法会执行一次f但是如果f中再次调用这个Once的Do方法的话就会导致死锁的情况出现。这还不是无限递归的情况而是的的确确的Lock的递归调用导致的死锁。

func main() {
    var once sync.Once
    once.Do(func() {
        once.Do(func() {
            fmt.Println("初始化")
        })
    })
}

当然想要避免这种情况的出现就不要在f参数中调用当前的这个Once不管是直接的还是间接的。

第二种错误:未初始化

如果f方法执行的时候panic或者f执行初始化资源的时候失败了这个时候Once还是会认为初次执行已经成功了即使再次调用Do方法也不会再次执行f。

比如下面的例子由于一些防火墙的原因googleConn并没有被正确的初始化后面如果想当然认为既然执行了Do方法googleConn就已经初始化的话会抛出空指针的错误

func main() {
    var once sync.Once
    var googleConn net.Conn // 到Google网站的一个连接

    once.Do(func() {
        // 建立到google.com的连接有可能因为网络的原因googleConn并没有建立成功此时它的值为nil
        googleConn, _ = net.Dial("tcp", "google.com:80")
    })
    // 发送http请求
    googleConn.Write([]byte("GET / HTTP/1.1\r\nHost: google.com\r\n Accept: */*\r\n\r\n"))
    io.Copy(os.Stdout, googleConn)
}

既然执行过Once.Do方法也可能因为函数执行失败的原因未初始化资源并且以后也没机会再次初始化资源那么这种初始化未完成的问题该怎么解决呢

这里我来告诉你一招独家秘笈,我们可以自己实现一个类似Once的并发原语既可以返回当前调用Do方法是否正确完成还可以在初始化失败后调用Do方法再次尝试初始化直到初始化成功才不再初始化了。

// 一个功能更加强大的Once
type Once struct {
    m    sync.Mutex
    done uint32
}
// 传入的函数f有返回值error如果初始化失败需要返回失败的error
// Do方法会把这个error返回给调用者
func (o *Once) Do(f func() error) error {
    if atomic.LoadUint32(&o.done) == 1 { //fast path
        return nil
    }
    return o.slowDo(f)
}
// 如果还没有初始化
func (o *Once) slowDo(f func() error) error {
    o.m.Lock()
    defer o.m.Unlock()
    var err error
    if o.done == 0 { // 双检查,还没有初始化
        err = f()
        if err == nil { // 初始化成功才将标记置为已初始化
            atomic.StoreUint32(&o.done, 1)
        }
    }
    return err
}

我们所做的改变就是Do方法和参数f函数都会返回error如果f执行失败会把这个错误信息返回。

对slowDo方法也做了调整如果f调用失败我们不会更改done字段的值这样后续的goroutine还会继续调用f。如果f执行成功才会修改done的值为1。

可以说真是一顿操作猛如虎我们使用Once有点得心应手的感觉了。等等还有个问题我们怎么查询是否初始化过呢

目前的Once实现可以保证你调用任意次数的once.Do方法它只会执行这个方法一次。但是有时候我们需要打一个标记。如果初始化后我们就去执行其它的操作标准库的Once并不会告诉你是否初始化完成了只是让你放心大胆地去执行Do方法所以你还需要一个辅助变量,自己去检查是否初始化过了比如通过下面的代码中的inited字段

type AnimalStore struct {once   sync.Once;inited uint32}
func (a *AnimalStore) Init() // 可以被并发调用
	a.once.Do(func() {
		longOperationSetupDbOpenFilesQueuesEtc()
		atomic.StoreUint32(&a.inited, 1)
	})
}
func (a *AnimalStore) CountOfCats() (int, error) { // 另外一个goroutine
	if atomic.LoadUint32(&a.inited) == 0 { // 初始化后才会执行真正的业务逻辑
		return 0, NotYetInitedError
	}
        //Real operation
}

当然通过这段代码我们可以解决这类问题但是如果官方的Once类型有Done这样一个方法的话我们就可以直接使用了。这是有人在Go代码库中提出的一个issue(#41690)。对于这类问题,一般都会被建议采用其它类型,或者自己去扩展。我们可以尝试扩展这个并发原语:

// Once 是一个扩展的sync.Once类型提供了一个Done方法
type Once struct {
    sync.Once
}

// Done 返回此Once是否执行过
// 如果执行过则返回true
// 如果没有执行过或者正在执行返回false
func (o *Once) Done() bool {
    return atomic.LoadUint32((*uint32)(unsafe.Pointer(&o.Once))) == 1
}

func main() {
    var flag Once
    fmt.Println(flag.Done()) //false

    flag.Do(func() {
        time.Sleep(time.Second)
    })

    fmt.Println(flag.Done()) //true
}

好了到这里关于并发原语Once的内容我讲得就差不多了。最后呢和你分享一个Once的踩坑案例。

其实啊使用Once真的不容易犯错想犯错都很困难因为很少有人会傻傻地在初始化函数f中递归调用f这种死锁的现象几乎不会发生。另外如果函数初始化不成功我们一般会panic或者在使用的时候做检查会及早发现这个问题在初始化函数中加强代码。

所以查看大部分的Go项目几乎找不到Once的错误使用场景不过我还是发现了一个。这个issue先从另外一个需求(go#25955)谈起。

Once的踩坑案例

go#25955有网友提出一个需求希望Once提供一个Reset方法能够将Once重置为初始化的状态。比如下面的例子St通过两个Once控制它的Open/Close状态。但是在Close之后再调用Open的话不会再执行init函数因为Once只会执行一次初始化函数。

type St struct {
    openOnce *sync.Once
    closeOnce *sync.Once
}

func(st *St) Open(){
    st.openOnce.Do(func() { ... }) // init
    ...
}

func(st *St) Close(){
    st.closeOnce.Do(func() { ... }) // deinit
    ...
}

所以提交这个Issue的开发者希望Once增加一个Reset方法Reset之后再调用once.Do就又可以初始化了。

Go的核心开发者Ian Lance Taylor给他了一个简单的解决方案。在这个例子中只使用一个ponce *sync.Once 做初始化Reset的时候给ponce这个变量赋值一个新的Once实例即可(ponce = new(sync.Once))。Once的本意就是执行一次所以Reset破坏了这个并发原语的本意。

这个解决方案一点都没问题可以很好地解决这位开发者的需求。Docker较早的版本1.11.2中使用了它们的一个网络库libnetwork这个网络库在使用Once的时候就使用Ian Lance Taylor介绍的方法但是不幸的是它的Reset方法中又改变了Once指针的值导致程序panic了。原始逻辑比较复杂一个简化版可重现的代码如下:

package main

import (
	"fmt"
	"sync"
	"time"
)

// 一个组合的并发原语
type MuOnce struct {
	sync.RWMutex
	sync.Once
	mtime time.Time
	vals  []string
}

// 相当于reset方法会将m.Once重新复制一个Once
func (m *MuOnce) refresh() {
	m.Lock()
	defer m.Unlock()
	m.Once = sync.Once{}
	m.mtime = time.Now()
	m.vals = []string{m.mtime.String()}
}

// 获取某个初始化的值如果超过某个时间会reset Once
func (m *MuOnce) strings() []string {
	now := time.Now()
	m.RLock()
	if now.After(m.mtime) {
		defer m.Do(m.refresh) // 使用refresh函数重新初始化
	}
	vals := m.vals
	m.RUnlock()
	return vals
}

func main() {
	fmt.Println("Hello, playground")
	m := new(MuOnce)
	fmt.Println(m.strings())
	fmt.Println(m.strings())
}

如果你执行这段代码就会panic:

原因在于第31行执行m.Once.Do方法的时候使用的是m.Once的指针然后调用m.refresh在执行m.refresh的时候Once内部的Mutex首先会加锁可以再翻看一下这一讲的Once的实现原理 但是在refresh中更改了Once指针的值之后结果在执行完refresh释放锁的时候释放的是一个刚初始化未加锁的Mutex所以就panic了。

如果你还不太明白,我再给你简化成一个更简单的例子:

package main


import (
    "sync"
)

type Once struct {
    m sync.Mutex
}

func (o *Once) doSlow() {
    o.m.Lock()
    defer o.m.Unlock()

    // 这里更新的o指针的值!!!!!!!, 会导致上一行Unlock出错
    *o = Once{}
}

func main() {
    var once Once
    once.doSlow()
}

doSlow方法就演示了这个错误。Ian Lance Taylor介绍的Reset方法没有错误但是你在使用的时候千万别再初始化函数中Reset这个Once否则势必会导致Unlock一个未加锁的Mutex的错误。

总的来说这还是对Once的实现机制不熟悉又进行复杂使用导致的错误。不过最新版的libnetwork相关的地方已经去掉了Once的使用了。所以我带你一起来看这个案例主要目的还是想巩固一下我们对Once的理解。

总结

今天我们一起学习了Once我们常常使用它来实现单例模式。

单例是23种设计模式之一也是常常引起争议的设计模式之一甚至有人把它归为反模式。为什么说它是反模式呢我拿标准库中的单例模式给你介绍下。

因为Go没有immutable类型导致我们声明的全局变量都是可变的别的地方或者第三方库可以随意更改这些变量。比如package io中定义了几个全局变量比如io.EOF

var EOF = errors.New("EOF")

因为它是一个package级别的变量我们可以在程序中偷偷把它改了这会导致一些依赖io.EOF这个变量做判断的代码出错。

io.EOF = errors.New("我们自己定义的EOF")

从我个人的角度来说一些单例全局变量的确很方便比如Buffer池或者连接池所以有时候我们也不要谈虎色变。虽然有人把单例模式称之为反模式但毕竟只能代表一部分开发者的观点否则也不会把它列在23种设计模式中了。

如果你真的担心这个package级别的变量被人修改你可以不把它们暴露出来而是提供一个只读的GetXXX的方法这样别人就不会进行修改了。

而且Once不只应用于单例模式一些变量在也需要在使用的时候做延迟初始化所以也是可以使用Once处理这些场景的。

总而言之Once的应用场景还是很广泛的。一旦你遇到只需要初始化一次的场景首先想到的就应该是Once并发原语。

思考题

  1. 我已经分析了几个并发原语的实现你可能注意到总是有些slowXXXX的方法从XXXX方法中单独抽取出来你明白为什么要这么做吗有什么好处

  2. Once在第一次使用之后还能复制给其它变量使用吗

欢迎在留言区写下你的思考和答案,我们一起交流讨论。如果你觉得有所收获,也欢迎你把今天的内容分享给你的朋友或同事。