You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

38 KiB

33并发小channel中蕴含大智慧

你好我是Tony Bai。

通过上两节课的学习我们知道了Go语言实现了基于CSPCommunicating Sequential Processes理论的并发方案。

Go语言的CSP模型的实现包含两个主要组成部分一个是Goroutine它是Go应用并发设计的基本构建与执行单元另一个就是channel它在并发模型中扮演着重要的角色。channel既可以用来实现Goroutine间的通信还可以实现Goroutine间的同步。它就好比Go并发设计这门“武功”的秘籍口诀可以说学会在Go并发设计时灵活运用channel才能说真正掌握了Go并发设计的真谛。

所以在这一讲中我们就来系统学习channel这一并发原语的基础语法与常见使用方法。

作为一等公民的channel

Go对并发的原生支持可不是仅仅停留在口号上的Go在语法层面将并发原语channel作为一等公民对待。在前面的第21讲中我们已经学过“一等公民”这个概念了,如果你记不太清了可以回去复习一下。

那channel作为一等公民意味着什么呢

这意味着我们可以像使用普通变量那样使用channel比如定义channel类型变量、给channel变量赋值、将channel作为参数传递给函数/方法、将channel作为返回值从函数/方法中返回甚至将channel发送到其他channel中。这就大大简化了channel原语的使用提升了我们开发者在做并发设计和实现时的体验。

创建channel

和切片、结构体、map等一样channel也是一种复合数据类型。也就是说我们在声明一个channel类型变量时必须给出其具体的元素类型比如下面的代码这样

var ch chan int

这句代码里我们声明了一个元素为int类型的channel类型变量ch。

如果channel类型变量在声明时没有被赋予初值那么它的默认值为nil。并且和其他复合数据类型支持使用复合类型字面值作为变量初始值不同为channel类型变量赋初值的唯一方法就是使用make这个Go预定义的函数比如下面代码

ch1 := make(chan int)   
ch2 := make(chan int, 5) 

这里我们声明了两个元素类型为int的channel类型变量ch1和ch2并给这两个变量赋了初值。但我们看到两个变量的赋初值操作使用的make调用的形式有所不同。

第一行我们通过make(chan T)创建的、元素类型为T的channel类型无缓冲channel而第二行中通过带有capacity参数的make(chan T, capacity)创建的元素类型为T、缓冲区长度为capacity的channel类型带缓冲channel

这两种类型的变量关于发送send与接收receive的特性是不同的我们接下来就基于这两种类型的channel看看channel类型变量如何进行发送和接收数据元素。

发送与接收

Go提供了<-操作符用于对channel类型变量进行发送与接收操作

ch1 <- 13    // 将整型字面值13发送到无缓冲channel类型变量ch1中
n := <- ch1  // 从无缓冲channel类型变量ch1中接收一个整型值存储到整型变量n中
ch2 <- 17    // 将整型字面值17发送到带缓冲channel类型变量ch2中
m := <- ch2  // 从带缓冲channel类型变量ch2中接收一个整型值存储到整型变量m中

这里我要提醒你一句在理解channel的发送与接收操作时你一定要始终牢记channel是用于Goroutine间通信的所以绝大多数对channel的读写都被分别放在了不同的Goroutine中。

现在我们先来看看无缓冲channel类型变量如ch1的发送与接收。

由于无缓冲channel的运行时层实现不带有缓冲区所以Goroutine对无缓冲channel的接收和发送操作是同步的。也就是说对同一个无缓冲channel只有对它进行接收操作的Goroutine和对它进行发送操作的Goroutine都存在的情况下通信才能得以进行否则单方面的操作会让对应的Goroutine陷入挂起状态比如下面示例代码

func main() {
    ch1 := make(chan int)
    ch1 <- 13 // fatal error: all goroutines are asleep - deadlock!
    n := <-ch1
    println(n)
}

在这个示例中我们创建了一个无缓冲的channel类型变量ch1对ch1的读写都放在了一个Goroutine中。

运行这个示例我们就会得到fatal error提示我们所有Goroutine都处于休眠状态程序处于死锁状态。要想解除这种错误状态我们只需要将接收操作或者发送操作放到另外一个Goroutine中就可以了比如下面代码

func main() {
    ch1 := make(chan int)
    go func() {
        ch1 <- 13 // 将发送操作放入一个新goroutine中执行
    }()
    n := <-ch1
    println(n)
}

由此,我们可以得出结论:对无缓冲channel类型的发送与接收操作一定要放在两个不同的Goroutine中进行否则会导致deadlock

接下来我们再来看看带缓冲channel的发送与接收操作。

和无缓冲channel相反带缓冲channel的运行时层实现带有缓冲区因此对带缓冲channel的发送操作在缓冲区未满、接收操作在缓冲区非空的情况下是异步的(发送或接收不需要阻塞等待)。

也就是说对一个带缓冲channel来说在缓冲区未满的情况下对它进行发送操作的Goroutine并不会阻塞挂起在缓冲区有数据的情况下对它进行接收操作的Goroutine也不会阻塞挂起。

但当缓冲区满了的情况下对它进行发送操作的Goroutine就会阻塞挂起当缓冲区为空的情况下对它进行接收操作的Goroutine也会阻塞挂起。

如果光看文字还不是很好理解你可以再看看下面几个关于带缓冲channel的操作的例子

ch2 := make(chan int, 1)
n := <-ch2 // 由于此时ch2的缓冲区中无数据因此对其进行接收操作将导致goroutine挂起

ch3 := make(chan int, 1)
ch3 <- 17  // 向ch3发送一个整型数17
ch3 <- 27  // 由于此时ch3中缓冲区已满再向ch3发送数据也将导致goroutine挂起

也正是因为带缓冲channel与无缓冲channel在发送与接收行为上的差异在具体使用上它们有各自的“用武之地”这个我们等会再细说现在我们先继续把channel的基本语法讲完。

使用操作符<-,我们还可以声明只发送channel类型send-only只接收channel类型recv-only我们接着看下面这个例子

ch1 := make(chan<- int, 1) // 只发送channel类型
ch2 := make(<-chan int, 1) // 只接收channel类型

<-ch1       // invalid operation: <-ch1 (receive from send-only type chan<- int)
ch2 <- 13   // invalid operation: ch2 <- 13 (send to receive-only type <-chan int)

你可以从这个例子中看到试图从一个只发送channel类型变量中接收数据或者向一个只接收channel类型发送数据都会导致编译错误。通常只发送channel类型和只接收channel类型会被用作函数的参数类型或返回值用于限制对channel内的操作或者是明确可对channel进行的操作的类型比如下面这个例子

func produce(ch chan<- int) {
    for i := 0; i < 10; i++ {
        ch <- i + 1
        time.Sleep(time.Second)
    }
    close(ch)
}

func consume(ch <-chan int) {
    for n := range ch {
        println(n)
    }
}

func main() {
    ch := make(chan int, 5)
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        produce(ch)
        wg.Done()
    }()

    go func() {
        consume(ch)
        wg.Done()
    }()

    wg.Wait()
}

在这个例子中我们启动了两个Goroutine分别代表生产者produce与消费者consume。生产者只能向channel中发送数据我们使用chan<- int作为produce函数的参数类型消费者只能从channel中接收数据我们使用<-chan int作为consume函数的参数类型。

在消费者函数consume中我们使用了for range循环语句来从channel中接收数据for range会阻塞在对channel的接收操作上直到channel中有数据可接收或channel被关闭循环才会继续向下执行。channel被关闭后for range循环也就结束了。

关闭channel

在上面的例子中produce函数在发送完数据后调用Go内置的close函数关闭了channel。channel关闭后所有等待从这个channel接收数据的操作都将返回。

这里我们继续看一下采用不同接收语法形式的语句在channel被关闭后的返回值的情况

n := <- ch      // 当ch被关闭后n将被赋值为ch元素类型的零值
m, ok := <-ch   // 当ch被关闭后m将被赋值为ch元素类型的零值, ok值为false
for v := range ch { // 当ch被关闭后for range循环结束
    ... ...
}

我们看到通过“comma, ok”惯用法或for range语句我们可以准确地判定channel是否被关闭。而单纯采用n := <-ch形式的语句我们就无法判定从ch返回的元素类型零值究竟是不是因为channel被关闭后才返回的。

另外从前面produce的示例程序中我们也可以看到channel是在produce函数中被关闭的这也是channel的一个使用惯例那就是发送端负责关闭channel

这里为什么要在发送端关闭channel呢

这是因为发送端没有像接受端那样的、可以安全判断channel是否被关闭了的方法。同时一旦向一个已经关闭的channel执行发送操作这个操作就会引发panic比如下面这个示例

ch := make(chan int, 5)
close(ch)
ch <- 13 // panic: send on closed channel

select

当涉及同时对多个channel进行操作时我们会结合Go为CSP并发模型提供的另外一个原语select,一起使用。

通过select我们可以同时在多个channel上进行发送/接收操作:

select {
case x := <-ch1:     // 从channel ch1接收数据
	... ...

case y, ok := <-ch2: // 从channel ch2接收数据并根据ok值判断ch2是否已经关闭
	... ...

case ch3 <- z:       // 将z值发送到channel ch3中:
	... ...

default:             // 当上面case中的channel通信均无法实施时执行该默认分支
}

当select语句中没有default分支而且所有case中的channel操作都阻塞了的时候整个select语句都将被阻塞直到某一个case上的channel变成可发送或者某个case上的channel变成可接收select语句才可以继续进行下去。关于select语句的妙用我们在后面还会细讲这里我们先简单了解它的基本语法。

看到这里你应该能感受到channel和select两种原语的操作都十分简单它们都遵循了Go语言**“追求简单”**的设计哲学但它们却为Go并发程序带来了强大的表达能力。学习了这些基础用法后接下来我们再深一层看看Go并发原语channel的一些惯用法。同样地这里我们也分成无缓冲channel和带缓冲channel两种情况来分析。

无缓冲channel的惯用法

无缓冲channel兼具通信和同步特性在并发程序中应用颇为广泛。现在我们来看看几个无缓冲channel的典型应用

第一种用法:用作信号传递

无缓冲channel用作信号传递的时候有两种情况分别是1对1通知信号和1对n通知信号。我们先来分析下1对1通知信号这种情况。

我们直接来看具体的例子:

type signal struct{}

func worker() {
    println("worker is working...")
    time.Sleep(1 * time.Second)
}

func spawn(f func()) <-chan signal {
    c := make(chan signal)
    go func() {
        println("worker start to work...")
        f()
        c <- signal{}
    }()
    return c
}

func main() {
    println("start a worker...")
    c := spawn(worker)
    <-c
    fmt.Println("worker work done!")
}

在这个例子中spawn函数返回的channel被用于承载新Goroutine退出的**“通知信号”**这个信号专门用作通知main goroutine。main goroutine在调用spawn函数后一直阻塞在对这个“通知信号”的接收动作上。

我们来运行一下这个例子:

start a worker...
worker start to work...
worker is working...
worker work done!

有些时候无缓冲channel还被用来实现1对n的信号通知机制。这样的信号通知机制常被用于协调多个Goroutine一起工作比如下面的例子

func worker(i int) {
    fmt.Printf("worker %d: is working...\n", i)
    time.Sleep(1 * time.Second)
    fmt.Printf("worker %d: works done\n", i)
}

type signal struct{}
func spawnGroup(f func(i int), num int, groupSignal <-chan signal) <-chan signal {
    c := make(chan signal)
    var wg sync.WaitGroup

    for i := 0; i < num; i++ {
        wg.Add(1)
        go func(i int) {
            <-groupSignal
            fmt.Printf("worker %d: start to work...\n", i)
            f(i)
            wg.Done()
        }(i + 1)
    }

    go func() {
        wg.Wait()
        c <- signal{}
    }()
    return c
}

func main() {
    fmt.Println("start a group of workers...")
    groupSignal := make(chan signal)
    c := spawnGroup(worker, 5, groupSignal)
    time.Sleep(5 * time.Second)
    fmt.Println("the group of workers start to work...")
    close(groupSignal)
    <-c
    fmt.Println("the group of workers work done!")
}

这个例子中main goroutine创建了一组5个worker goroutine这些Goroutine启动后会阻塞在名为groupSignal的无缓冲channel上。main goroutine通过close(groupSignal)向所有worker goroutine广播“开始工作”的信号收到groupSignal后所有worker goroutine会**“同时”**开始工作,就像起跑线上的运动员听到了裁判员发出的起跑信号枪声。

这个例子的运行结果如下:

start a group of workers...
the group of workers start to work...
worker 3: start to work...
worker 3: is working...
worker 4: start to work...
worker 4: is working...
worker 1: start to work...
worker 1: is working...
worker 5: start to work...
worker 5: is working...
worker 2: start to work...
worker 2: is working...
worker 3: works done
worker 4: works done
worker 5: works done
worker 1: works done
worker 2: works done
the group of workers work done!

我们可以看到关闭一个无缓冲channel会让所有阻塞在这个channel上的接收操作返回从而实现了一种1对n的**“广播”**机制。

第二种用法:用于替代锁机制

无缓冲channel具有同步特性这让它在某些场合可以替代锁让我们的程序更加清晰可读性也更好。我们可以对比下两个方案直观地感受一下。

首先我们看一个传统的、基于“共享内存”+“互斥锁”的Goroutine安全的计数器的实现

type counter struct {
    sync.Mutex
    i int
}

var cter counter

func Increase() int {
    cter.Lock()
    defer cter.Unlock()
    cter.i++
    return cter.i
}

func main() {
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            v := Increase()
            fmt.Printf("goroutine-%d: current counter value is %d\n", i, v)
            wg.Done()
        }(i)
    }

    wg.Wait()
}

在这个示例中我们使用了一个带有互斥锁保护的全局变量作为计数器所有要操作计数器的Goroutine共享这个全局变量并在互斥锁的同步下对计数器进行自增操作。

接下来我们再看更符合Go设计惯例的实现也就是使用无缓冲channel替代锁后的实现

type counter struct {
    c chan int
    i int
}

func NewCounter() *counter {
    cter := &counter{
        c: make(chan int),
    }
    go func() {
        for {
            cter.i++
            cter.c <- cter.i
        }
    }()
    return cter
}

func (cter *counter) Increase() int {
    return <-cter.c
}

func main() {
    cter := NewCounter()
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            v := cter.Increase()
            fmt.Printf("goroutine-%d: current counter value is %d\n", i, v)
            wg.Done()
        }(i)
    }
    wg.Wait()
}

在这个实现中我们将计数器操作全部交给一个独立的Goroutine去处理并通过无缓冲channel的同步阻塞特性实现了计数器的控制。这样其他Goroutine通过Increase函数试图增加计数器值的动作实质上就转化为了一次无缓冲channel的接收动作。

这种并发设计逻辑更符合Go语言所倡导的**“不要通过共享内存来通信,而是通过通信来共享内存”**的原则。

运行这个示例,我们可以得出与互斥锁方案相同的结果:

goroutine-9: current counter value is 10
goroutine-0: current counter value is 1
goroutine-6: current counter value is 7
goroutine-2: current counter value is 3
goroutine-8: current counter value is 9
goroutine-4: current counter value is 5
goroutine-5: current counter value is 6
goroutine-1: current counter value is 2
goroutine-7: current counter value is 8
goroutine-3: current counter value is 4

带缓冲channel的惯用法

带缓冲的channel与无缓冲的channel的最大不同之处就在于它的异步性。也就是说对一个带缓冲channel在缓冲区未满的情况下对它进行发送操作的Goroutine不会阻塞挂起在缓冲区有数据的情况下对它进行接收操作的Goroutine也不会阻塞挂起。

这种特性让带缓冲的channel有着与无缓冲channel不同的应用场合。接下来我们一个个来分析。

第一种用法:用作消息队列

channel经常被Go初学者视为在多个Goroutine之间通信的消息队列这是因为channel的原生特性与我们认知中的消息队列十分相似包括Goroutine安全、有FIFOfirst-in, first out保证等。

其实和无缓冲channel更多用于信号/事件管道相比可自行设置容量、异步收发的带缓冲channel更适合被用作为消息队列并且带缓冲channel在数据收发的性能上要明显好于无缓冲channel。

我们可以通过对channel读写的基本测试来印证这一点。下面是一些关于无缓冲channel和带缓冲channel收发性能测试的结果Go 1.17, MacBook Pro 8核。基准测试的代码比较多我就不全部贴出来了你可以到这里下载。

  • 单接收单发送性能的基准测试
    我们先来看看针对一个channel只有一个发送Goroutine和一个接收Goroutine的情况两种channel的收发性能比对数据
// 无缓冲channel
// go-channel-operation-benchmark/unbuffered-chan

$go test -bench . one_to_one_test.go
goos: darwin
goarch: amd64
cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz
BenchmarkUnbufferedChan1To1Send-8   	 6037778	       199.7 ns/op
BenchmarkUnbufferedChan1To1Recv-8   	 6286850	       194.5 ns/op
PASS
ok  	command-line-arguments	2.833s

// 带缓冲channel
// go-channel-operation-benchmark/buffered-chan

$go test -bench . one_to_one_cap_10_test.go
goos: darwin
goarch: amd64
cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz
BenchmarkBufferedChan1To1SendCap10-8   	17089879	        66.16 ns/op
BenchmarkBufferedChan1To1RecvCap10-8   	18043450	        65.57 ns/op
PASS
ok  	command-line-arguments	2.460s

然后我们将channel的缓存由10改为100再看看带缓冲channel的1对1基准测试结果

$go test -bench . one_to_one_cap_100_test.go
goos: darwin
goarch: amd64
cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz
BenchmarkBufferedChan1To1SendCap100-8   	23089318	        53.06 ns/op
BenchmarkBufferedChan1To1RecvCap100-8   	23474095	        51.33 ns/op
PASS
ok  	command-line-arguments	2.542s

  • 多接收多发送性能基准测试
    我们再来看看针对一个channel有多个发送Goroutine和多个接收Goroutine的情况两种channel的收发性能比对数据这里建立10个发送Goroutine和10个接收Goroutine
// 无缓冲channel
// go-channel-operation-benchmark/unbuffered-chan

$go test -bench .  multi_to_multi_test.go 
goos: darwin
goarch: amd64
cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz
BenchmarkUnbufferedChanNToNSend-8   	  293930	      3779 ns/op
BenchmarkUnbufferedChanNToNRecv-8   	  280904	      4190 ns/op
PASS
ok  	command-line-arguments	2.387s

// 带缓冲channel
// go-channel-operation-benchmark/buffered-chan

$go test -bench . multi_to_multi_cap_10_test.go 
goos: darwin
goarch: amd64
cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz
BenchmarkBufferedChanNToNSendCap10-8   	  736540	      1609 ns/op
BenchmarkBufferedChanNToNRecvCap10-8   	  795416	      1616 ns/op
PASS
ok  	command-line-arguments	2.514s

这里我们也将channel的缓存由10改为100后看看带缓冲channel的多对多基准测试结果

$go test -bench . multi_to_multi_cap_100_test.go 
goos: darwin
goarch: amd64
cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz
BenchmarkBufferedChanNToNSendCap100-8   	 1236453	       966.4 ns/op
BenchmarkBufferedChanNToNRecvCap100-8   	 1279766	       969.4 ns/op
PASS
ok  	command-line-arguments	4.309s

综合前面这些结果数据,我们可以得出几个初步结论:

  • 无论是1收1发还是多收多发带缓冲channel的收发性能都要好于无缓冲channel
  • 对于带缓冲channel而言发送与接收的Goroutine数量越多收发性能会有所下降
  • 对于带缓冲channel而言选择适当容量会在一定程度上提升收发性能。

不过你要注意的是Go支持channel的初衷是将它作为Goroutine间的通信手段它并不是专门用于消息队列场景的。如果你的项目需要专业消息队列的功能特性比如支持优先级、支持权重、支持离线持久化等那么channel就不合适了可以使用第三方的专业的消息队列实现。

第二种用法用作计数信号量counting semaphore

Go并发设计的一个惯用法就是将带缓冲channel用作计数信号量counting semaphore。带缓冲channel中的当前数据个数代表的是当前同时处于活动状态处理业务的Goroutine的数量而带缓冲channel的容量capacity就代表了允许同时处于活动状态的Goroutine的最大数量。向带缓冲channel的一个发送操作表示获取一个信号量而从channel的一个接收操作则表示释放一个信号量。

这里我们来看一个将带缓冲channel用作计数信号量的例子

var active = make(chan struct{}, 3)
var jobs = make(chan int, 10)

func main() {
    go func() {
        for i := 0; i < 8; i++ {
            jobs <- (i + 1)
        }
        close(jobs)
    }()

    var wg sync.WaitGroup

    for j := range jobs {
        wg.Add(1)
        go func(j int) {
            active <- struct{}{}
            log.Printf("handle job: %d\n", j)
            time.Sleep(2 * time.Second)
            <-active
            wg.Done()
        }(j)
    }
    wg.Wait()
}

我们看到这个示例创建了一组Goroutine来处理job同一时间允许最多3个Goroutine处于活动状态。

为了达成这一目标我们看到这个示例使用了一个容量capacity为3的带缓冲channel: active作为计数信号量,这意味着允许同时处于活动状态的最大Goroutine数量为3。

我们运行一下这个示例:

2022/01/02 10:08:55 handle job: 1
2022/01/02 10:08:55 handle job: 4
2022/01/02 10:08:55 handle job: 8
2022/01/02 10:08:57 handle job: 5
2022/01/02 10:08:57 handle job: 7
2022/01/02 10:08:57 handle job: 6
2022/01/02 10:08:59 handle job: 3
2022/01/02 10:08:59 handle job: 2

从示例运行结果中的时间戳中我们可以看到虽然我们创建了很多Goroutine但由于计数信号量的存在同一时间内处于活动状态正在处理job的Goroutine的数量最多为3个。

len(channel)的应用

len是Go语言的一个内置函数它支持接收数组、切片、map、字符串和channel类型的参数并返回对应类型的“长度”也就是一个整型值。

针对channel ch的类型不同len(ch)有如下两种语义:

  • 当ch为无缓冲channel时len(ch)总是返回0
  • 当ch为带缓冲channel时len(ch)返回当前channel ch中尚未被读取的元素个数。

这样一来针对带缓冲channel的len调用似乎才是有意义的。那我们是否可以使用len函数来实现带缓冲channel的“判满”、“判有”和“判空”逻辑呢就像下面示例中伪代码这样

var ch chan T = make(chan T, capacity)

// 判空
if len(ch) == 0 {
    // 此时channel ch空了?
}

// 判有
if len(ch) > 0 {
    // 此时channel ch中有数据?
}

// 判满
if len(ch) == cap(ch) {
    // 此时channel ch满了?
}

你可以看到,我在上面代码注释的“空了”、“有数据”和“满了”的后面都**打上了问号****。**这是为什么呢?

这是因为channel原语用于多个Goroutine间的通信一旦多个Goroutine共同对channel进行收发操作len(channel)就会在多个Goroutine间形成“竞态”。单纯地依靠len(channel)来判断channel中元素状态是不能保证在后续对channel的收发时channel状态是不变的。

我们以判空为例看看:

图片

从上图可以看到Goroutine1使用len(channel)判空后就会尝试从channel中接收数据。但在它真正从channel读数据之前另外一个Goroutine2已经将数据读了出去所以Goroutine1后面的读取就会阻塞在channel上,导致后面逻辑的失效。

因此,为了不阻塞在channel上常见的方法是将“判空与读取”放在一个“事务”中将“判满与写入”放在一个“事务”中而这类“事务”我们可以通过select实现。我们来看下面示例

func producer(c chan<- int) {
    var i int = 1
    for {
        time.Sleep(2 * time.Second)
        ok := trySend(c, i)
        if ok {
            fmt.Printf("[producer]: send [%d] to channel\n", i)
            i++
            continue
        }
        fmt.Printf("[producer]: try send [%d], but channel is full\n", i)
    }
}

func tryRecv(c <-chan int) (int, bool) {
    select {
    case i := <-c:
        return i, true

    default:
        return 0, false
    }
}

func trySend(c chan<- int, i int) bool {
    select {
    case c <- i:
        return true
    default:
        return false
    }
}

func consumer(c <-chan int) {
    for {
        i, ok := tryRecv(c)
        if !ok {
            fmt.Println("[consumer]: try to recv from channel, but the channel is empty")
            time.Sleep(1 * time.Second)
            continue
        }
        fmt.Printf("[consumer]: recv [%d] from channel\n", i)
        if i >= 3 {
            fmt.Println("[consumer]: exit")
            return
        }
    }
}

func main() {
    var wg sync.WaitGroup
    c := make(chan int, 3)
    wg.Add(2)
    go func() {
        producer(c)
        wg.Done()
    }()

    go func() {
        consumer(c)
        wg.Done()
    }()

    wg.Wait()
}

我们看到由于用到了select原语的default分支语义当channel空的时候tryRecv不会阻塞当channel满的时候trySend也不会阻塞。

这个示例的运行结果也证明了这一点无论是使用tryRecv的consumer还是使用trySend的producer都不会阻塞

[consumer]: try to recv from channel, but the channel is empty
[consumer]: try to recv from channel, but the channel is empty
[producer]: send [1] to channel
[consumer]: recv [1] from channel
[consumer]: try to recv from channel, but the channel is empty
[consumer]: try to recv from channel, but the channel is empty
[producer]: send [2] to channel
[consumer]: recv [2] from channel
[consumer]: try to recv from channel, but the channel is empty
[consumer]: try to recv from channel, but the channel is empty
[producer]: send [3] to channel
[consumer]: recv [3] from channel
[consumer]: exit
[producer]: send [4] to channel
[producer]: send [5] to channel
[producer]: send [6] to channel
[producer]: try send [7], but channel is full
[producer]: try send [7], but channel is full
[producer]: try send [7], but channel is full
... ...

这种方法适用于大多数场合但是这种方法有一个“问题”那就是它改变了channel的状态会让channel接收了一个元素或发送一个元素到channel。

有些时候我们不想这么做我们想在不改变channel状态的前提下单纯地侦测channel的状态而又不会因channel满或空阻塞在channel上。但很遗憾目前没有一种方法可以在实现这样的功能的同时适用于所有场合。

但是在特定的场景下我们可以用len(channel)来实现。比如下面这两种场景:

图片

上图中的情景(a)是一个“多发送单接收”的场景,也就是有多个发送者,但有且只有一个接收者。在这样的场景下我们可以在接收goroutine中使用len(channel)是否大于0来判断是否channel中有数据需要接收。

而情景(b)呢,是一个“多接收单发送”的场景,也就是有多个接收者,但有且只有一个发送者。在这样的场景下我们可以在发送Goroutine中使用len(channel)是否小于cap(channel)来判断是否可以执行向channel的发送操作。

nil channel的妙用

如果一个channel类型变量的值为nil我们称它为nil channel。nil channel有一个特性那就是对nil channel的读写都会发生阻塞。比如下面示例代码

func main() {
	var c chan int
	<-c //阻塞
}

或者:

func main() {
	var c chan int
	c<-1  //阻塞
}

你会看到无论上面的哪段代码被执行main goroutine都会阻塞在对nil channel的操作上。

不过nil channel的这个特性可不是一无是处有些时候应用nil channel的这个特性可以得到事半功倍的效果。我们来看一个例子

func main() {
    ch1, ch2 := make(chan int), make(chan int)
    go func() {
        time.Sleep(time.Second * 5)
        ch1 <- 5
        close(ch1)
    }()

    go func() {
        time.Sleep(time.Second * 7)
        ch2 <- 7
        close(ch2)
    }()

    var ok1, ok2 bool
    for {
        select {
        case x := <-ch1:
            ok1 = true
            fmt.Println(x)
        case x := <-ch2:
            ok2 = true
            fmt.Println(x)
        }

        if ok1 && ok2 {
            break
        }
    }
    fmt.Println("program end")
}

在这个示例中我们期望程序在接收完ch1和ch2两个channel上的数据后就退出。但实际的运行情况却是这样的

5
0
0
0
... ... //循环输出0
7
program end

我们原本期望上面这个在依次输出5和7两个数字后退出但实际运行的输出结果却是在输出5之后程序输出了许多的0值之后才输出7并退出。

这是怎么回事呢?我们简单分析一下这段代码的运行过程:

  • 前5sselect一直处于阻塞状态
  • 第5sch1返回一个5后被closeselect语句的case x := <-ch1这个分支被选出执行程序输出5并回到for循环并重新select
  • 由于ch1被关闭从一个已关闭的channel接收数据将永远不会被阻塞于是新一轮select又把case x := <-ch1这个分支选出并执行。由于ch1处于关闭状态从这个channel获取数据我们会得到这个channel对应类型的零值这里就是0。于是程序再次输出0程序按这个逻辑循环执行一直输出0值
  • 2s后ch2被写入了一个数值7。这样在某一轮select的过程中分支case x := <-ch2被选中得以执行程序输出7之后满足退出条件于是程序终止。

那我们可以怎么改进一下这个程序,让它能按照我们的预期输出呢?

是时候让nil channel登场了用nil channel改进后的示例代码是这样的

func main() {
    ch1, ch2 := make(chan int), make(chan int)
    go func() {
        time.Sleep(time.Second * 5)
        ch1 <- 5
        close(ch1)
    }()

    go func() {
        time.Sleep(time.Second * 7)
        ch2 <- 7
        close(ch2)
    }()

    for {
        select {
        case x, ok := <-ch1:
            if !ok {
                ch1 = nil
            } else {
                fmt.Println(x)
            }
        case x, ok := <-ch2:
            if !ok {
                ch2 = nil
            } else {
                fmt.Println(x)
            }
        }
        if ch1 == nil && ch2 == nil {
            break
        }
    }
    fmt.Println("program end")
}

这里改进后的示例程序的最关键的一个变化就是在判断ch1或ch2被关闭后显式地将ch1或ch2置为nil。

而我们前面已经知道了,对一个nil channel执行获取操作这个操作将阻塞。于是这里已经被置为nil的c1或c2的分支将再也不会被select选中执行。

改进后的示例的运行结果如下,与我们预期相符:

5
7
program end

与select结合使用的一些惯用法

channel和select的结合使用能形成强大的表达能力我们在前面的例子中已经或多或少见识过了。这里我再强调几种channel与select结合的惯用法。

第一种用法利用default分支避免阻塞

select语句的default分支的语义就是在其他非default分支因通信未就绪而无法被选择的时候执行的这就给default分支赋予了一种“避免阻塞”的特性。

其实在前面的**“len(channel)的应用”**小节的例子中我们就已经用到了“利用default分支”实现的trySendtryRecv两个函数:

func tryRecv(c <-chan int) (int, bool) {
	select {
	case i := <-c:
		return i, true

	default: // channel为空
		return 0, false
	}
}

func trySend(c chan<- int, i int) bool {
	select {
	case c <- i:
		return true
	default: // channel满了
		return false
	}
}

而且无论是无缓冲channel还是带缓冲channel这两个函数都能适用并且不会阻塞在空channel或元素个数已经达到容量的channel上。

在Go标准库中这个惯用法也有应用比如

// $GOROOT/src/time/sleep.go
func sendTime(c interface{}, seq uintptr) {
    // 无阻塞的向c发送当前时间
    select {
    case c.(chan Time) <- Now():
    default:
    }
}

第二种用法:实现超时机制

带超时机制的select是Go中常见的一种select和channel的组合用法。通过超时事件我们既可以避免长期陷入某种操作的等待中也可以做一些异常处理工作。

比如下面示例代码实现了一次具有30s超时的select

func worker() {
	select {
	case <-c:
	     // ... do some stuff
	case <-time.After(30 *time.Second):
	    return
	}
}

不过在应用带有超时机制的select时我们要特别注意timer使用后的释放尤其在大量创建timer的时候。

Go语言标准库提供的timer实际上是由Go运行时自行维护的而不是操作系统级的定时器资源它的使用代价要比操作系统级的低许多。但即便如此作为time.Timer的使用者我们也要尽量减少在使用Timer时给Go运行时和Go垃圾回收带来的压力要及时调用timer的Stop方法回收Timer资源。

第三种用法:实现心跳机制

结合time包的Ticker我们可以实现带有心跳机制的select。这种机制让我们可以在监听channel的同时执行一些周期性的任务,比如下面这段代码:

func worker() {
	heartbeat := time.NewTicker(30 * time.Second)
	defer heartbeat.Stop()
	for {
		select {
		case <-c:
			// ... do some stuff
		case <- heartbeat.C:
			//... do heartbeat stuff
		}
	}
}

这里我们使用time.NewTicker创建了一个Ticker类型实例heartbeat。这个实例包含一个channel类型的字段C这个字段会按一定时间间隔持续产生事件就像“心跳”一样。这样for循环在channel c无数据接收时会每隔特定时间完成一次迭代然后回到for循环进行下一次迭代。

和timer一样我们在使用完ticker之后也不要忘记调用它的Stop方法避免心跳事件在ticker的channel上面示例中的heartbeat.C中持续产生。

小结

好了,今天的课讲到这里就结束了,现在我们一起来回顾一下吧。

在这一讲中我们系统学习了Go CSP并发方案中除Goroutine之外的另一个重要组成部分channel。Go为了原生支持并发把channel视作一等公民身份这就大幅提升了开发人员使用channel进行并发设计和实现的体验。

通过预定义函数make我们可以创建两类channel无缓冲channel与带缓冲的channel。这两类channel具有不同的收发特性可以适用于不同的应用场合无缓冲channel兼具通信与同步特性常用于作为信号通知或替代同步锁而带缓冲channel的异步性让它更适合用来实现基于内存的消息队列、计数信号量等。

此外你也要牢记值为nil的channel的阻塞特性有些时候它也能帮上大忙。而面对已关闭的channel你也一定要小心尤其要避免向已关闭的channel发送数据那会导致panic。

最后select是Go为了支持同时操作多个channel而引入的另外一个并发原语select与channel有几种常用的固定搭配你也要好好掌握和理解。

思考题

channel作为Go并发设计的重要组成部分需要你掌握的细节非常多。而且channel的应用模式也非常多我们这一讲仅挑了几个常见的模式做了讲解。在日常开发中你还见过哪些实用的channel使用模式呢欢迎在留言区分享。

如果你觉得有收获也欢迎你把这节课分享给更多对Go并发感兴趣的朋友。我是Tony Bai我们下节课见。