You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

27 KiB

14 | Channel透过代码看典型的应用模式

你好,我是鸟窝。

前一讲我介绍了Channel的基础知识并且总结了几种应用场景。这一讲我将通过实例的方式带你逐个学习Channel解决这些问题的方法帮你巩固和完全掌握它的用法。

在开始上课之前我先补充一个知识点通过反射的方式执行select语句在处理很多的case clause尤其是不定长的case clause的时候非常有用。而且在后面介绍任务编排的实现时我也会采用这种方法所以我先带你具体学习下Channel的反射用法。

使用反射操作Channel

select语句可以处理chan的send和recvsend和recv都可以作为case clause。如果我们同时处理两个chan就可以写成下面的样子

    select {
    case v := <-ch1:
        fmt.Println(v)
    case v := <-ch2:
        fmt.Println(v)
    }

如果需要处理三个chan你就可以再添加一个case clause用它来处理第三个chan。可是如果要处理100个chan呢一万个chan呢

或者是chan的数量在编译的时候是不定的在运行的时候需要处理一个slice of chan这个时候也没有办法在编译前写成字面意义的select。那该怎么办

这个时候,就要“祭”出我们的反射大法了。

通过reflect.Select函数你可以将一组运行时的case clause传入当作参数执行。Go的select是伪随机的它可以在执行的case中随机选择一个case并把选择的这个case的索引chosen返回如果没有可用的case返回会返回一个bool类型的返回值这个返回值用来表示是否有case成功被选择。如果是recv case还会返回接收的元素。Select的方法签名如下

func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

下面我来借助一个例子来演示一下动态处理两个chan的情形。因为这样的方式可以动态处理case数据所以你可以传入几百几千几万的chan这就解决了不能动态处理n个chan的问题。

首先createCases函数分别为每个chan生成了recv case和send case并返回一个reflect.SelectCase数组。

然后通过一个循环10次的for循环执行reflect.Select这个方法会从cases中选择一个case执行。第一次肯定是send case因为此时chan还没有元素recv还不可用。等chan中有了数据以后recv case就可以被选择了。这样你就可以处理不定数量的chan了。

func main() {
    var ch1 = make(chan int, 10)
    var ch2 = make(chan int, 10)

    // 创建SelectCase
    var cases = createCases(ch1, ch2)

    // 执行10次select
    for i := 0; i < 10; i++ {
        chosen, recv, ok := reflect.Select(cases)
        if recv.IsValid() { // recv case
            fmt.Println("recv:", cases[chosen].Dir, recv, ok)
        } else { // send case
            fmt.Println("send:", cases[chosen].Dir, ok)
        }
    }
}

func createCases(chs ...chan int) []reflect.SelectCase {
    var cases []reflect.SelectCase


    // 创建recv case
    for _, ch := range chs {
        cases = append(cases, reflect.SelectCase{
            Dir:  reflect.SelectRecv,
            Chan: reflect.ValueOf(ch),
        })
    }

    // 创建send case
    for i, ch := range chs {
        v := reflect.ValueOf(i)
        cases = append(cases, reflect.SelectCase{
            Dir:  reflect.SelectSend,
            Chan: reflect.ValueOf(ch),
            Send: v,
        })
    }

    return cases
}

典型的应用场景

了解刚刚的反射用法我们就解决了今天的基础知识问题接下来我就带你具体学习下Channel的应用场景。

首先来看消息交流。

消息交流

从chan的内部实现看它是以一个循环队列的方式存放数据所以它有时候也会被当成线程安全的队列和buffer使用。一个goroutine可以安全地往Channel中塞数据另外一个goroutine可以安全地从Channel中读取数据goroutine就可以安全地实现信息交流了。

我们来看几个例子。

第一个例子是worker池的例子。Marcio Castilho 在 使用Go每分钟处理百万请求 这篇文章中,就介绍了他们应对大并发请求的设计。他们将用户的请求放在一个 chan Job 中这个chan Job就相当于一个待处理任务队列。除此之外还有一个chan chan Job队列用来存放可以处理任务的worker的缓存队列。

dispatcher会把待处理任务队列中的任务放到一个可用的缓存队列中worker会一直处理它的缓存队列。通过使用Channel实现了一个worker池的任务处理中心并且解耦了前端HTTP请求处理和后端任务处理的逻辑。

我在讲Pool的时候提到了一些第三方实现的worker池它们全部都是通过Channel实现的这是Channel的一个常见的应用场景。worker池的生产者和消费者的消息交流都是通过Channel实现的。

第二个例子是etcd中的node节点的实现包含大量的chan字段比如recvc是消息处理的chan待处理的protobuf消息都扔到这个chan中node有一个专门的run goroutine负责处理这些消息。

数据传递

“击鼓传花”的游戏很多人都玩过,花从一个人手中传给另外一个人,就有点类似流水线的操作。这个花就是数据,花在游戏者之间流转,这就类似编程中的数据传递。

还记得上节课我给你留了一道任务编排的题吗?其实它就可以用数据传递的方式实现。

有4个goroutine编号为1、2、3、4。每秒钟会有一个goroutine打印出它自己的编号要求你编写程序让输出的编号总是按照1、2、3、4、1、2、3、4……这个顺序打印出来。

为了实现顺序的数据传递,我们可以定义一个令牌的变量,谁得到令牌,谁就可以打印一次自己的编号,同时将令牌传递给下一个goroutine我们尝试使用chan来实现可以看下下面的代码。

type Token struct{}

func newWorker(id int, ch chan Token, nextCh chan Token) {
    for {
        token := <-ch         // 取得令牌
        fmt.Println((id + 1)) // id从1开始
        time.Sleep(time.Second)
        nextCh <- token
    }
}
func main() {
    chs := []chan Token{make(chan Token), make(chan Token), make(chan Token), make(chan Token)}

    // 创建4个worker
    for i := 0; i < 4; i++ {
        go newWorker(i, chs[i], chs[(i+1)%4])
    }

    //首先把令牌交给第一个worker
    chs[0] <- struct{}{}
  
    select {}
}

我来给你具体解释下这个实现方式。

首先我们定义一个令牌类型Token接着定义一个创建worker的方法这个方法会从它自己的chan中读取令牌。哪个goroutine取得了令牌就可以打印出自己编号因为需要每秒打印一次数据所以我们让它休眠1秒后再把令牌交给它的下家。

接着在第16行启动每个worker的goroutine并在第20行将令牌先交给第一个worker。

如果你运行这个程序就会在命令行中看到每一秒就会输出一个编号而且编号是以1、2、3、4这样的顺序输出的。

这类场景有一个特点就是当前持有数据的goroutine都有一个信箱信箱使用chan实现goroutine只需要关注自己的信箱中的数据处理完毕后就把结果发送到下一家的信箱中。

信号通知

chan类型有这样一个特点chan如果为空那么receiver接收数据的时候就会阻塞等待直到chan被关闭或者有新的数据到来。利用这个机制我们可以实现wait/notify的设计模式。

传统的并发原语Cond也能实现这个功能。但是Cond使用起来比较复杂容易出错而使用chan实现wait/notify模式就方便多了。

除了正常的业务处理时的wait/notify我们经常碰到的一个场景就是程序关闭的时候我们需要在退出之前做一些清理doCleanup方法的动作。这个时候我们经常要使用chan。

比如使用chan实现程序的graceful shutdown在退出之前执行一些连接关闭、文件close、缓存落盘等一些动作。

func main() {
	go func() {
      ...... // 执行业务处理
    }()

	// 处理CTRL+C等中断信号
	termChan := make(chan os.Signal)
	signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
	<-termChan 

	// 执行退出之前的清理动作
    doCleanup()
	
	fmt.Println("优雅退出")
}

有时候doCleanup可能是一个很耗时的操作比如十几分钟才能完成如果程序退出需要等待这么长时间用户是不能接受的所以在实践中我们需要设置一个最长的等待时间。只要超过了这个时间程序就不再等待可以直接退出。所以退出的时候分为两个阶段

  1. closing代表程序退出但是清理工作还没做
  2. closed代表清理工作已经做完。

所以,上面的例子可以改写如下:

func main() {
    var closing = make(chan struct{})
    var closed = make(chan struct{})

    go func() {
        // 模拟业务处理
        for {
            select {
            case <-closing:
                return
            default:
                // ....... 业务计算
                time.Sleep(100 * time.Millisecond)
            }
        }
    }()

    // 处理CTRL+C等中断信号
    termChan := make(chan os.Signal)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
    <-termChan

    close(closing)
    // 执行退出之前的清理动作
    go doCleanup(closed)

    select {
    case <-closed:
    case <-time.After(time.Second):
        fmt.Println("清理超时,不等了")
    }
    fmt.Println("优雅退出")
}

func doCleanup(closed chan struct{}) {
    time.Sleep((time.Minute))
    close(closed)
}

使用chan也可以实现互斥锁。

在chan的内部实现中就有一把互斥锁保护着它的所有字段。从外在表现上chan的发送和接收之间也存在着happens-before的关系保证元素放进去之后receiver才能读取到关于happends-before的关系是指事件发生的先后顺序关系我会在下一讲详细介绍这里你只需要知道它是一种描述事件先后顺序的方法

要想使用chan实现互斥锁至少有两种方式。一种方式是先初始化一个capacity等于1的Channel然后再放入一个元素。这个元素就代表锁谁取得了这个元素就相当于获取了这把锁。另一种方式是先初始化一个capacity等于1的Channel它的“空槽”代表锁谁能成功地把元素发送到这个Channel谁就获取了这把锁。

这是使用Channel实现锁的两种不同实现方式我重点介绍下第一种。理解了这种实现方式第二种方式也就很容易掌握了我就不多说了。

// 使用chan实现互斥锁
type Mutex struct {
    ch chan struct{}
}

// 使用锁需要初始化
func NewMutex() *Mutex {
    mu := &Mutex{make(chan struct{}, 1)}
    mu.ch <- struct{}{}
    return mu
}

// 请求锁,直到获取到
func (m *Mutex) Lock() {
    <-m.ch
}

// 解锁
func (m *Mutex) Unlock() {
    select {
    case m.ch <- struct{}{}:
    default:
        panic("unlock of unlocked mutex")
    }
}

// 尝试获取锁
func (m *Mutex) TryLock() bool {
    select {
    case <-m.ch:
        return true
    default:
    }
    return false
}

// 加入一个超时的设置
func (m *Mutex) LockTimeout(timeout time.Duration) bool {
    timer := time.NewTimer(timeout)
    select {
    case <-m.ch:
        timer.Stop()
        return true
    case <-timer.C:
    }
    return false
}

// 锁是否已被持有
func (m *Mutex) IsLocked() bool {
    return len(m.ch) == 0
}


func main() {
    m := NewMutex()
    ok := m.TryLock()
    fmt.Printf("locked v %v\n", ok)
    ok = m.TryLock()
    fmt.Printf("locked %v\n", ok)
}

你可以用buffer等于1的chan实现互斥锁在初始化这个锁的时候往Channel中先塞入一个元素谁把这个元素取走谁就获取了这把锁把元素放回去就是释放了锁。元素在放回到chan之前不会有goroutine能从chan中取出元素的这就保证了互斥性。

在这段代码中还有一点需要我们注意下利用select+chan的方式很容易实现TryLock、Timeout的功能。具体来说就是在select语句中我们可以使用default实现TryLock使用一个Timer来实现Timeout的功能。

任务编排

前面所说的消息交流的场景是一个特殊的任务编排的场景,这个“击鼓传花”的模式也被称为流水线模式。

第6讲我们学习了WaitGroup我们可以利用它实现等待模式启动一组goroutine执行任务然后等待这些任务都完成。其实我们也可以使用chan实现WaitGroup的功能。这个比较简单我就不举例子了接下来我介绍几种更复杂的编排模式。

这里的编排既指安排goroutine按照指定的顺序执行也指多个chan按照指定的方式组合处理的方式。goroutine的编排类似“击鼓传花”的例子我们通过编排数据在chan之间的流转就可以控制goroutine的执行。接下来我来重点介绍下多个chan的编排方式总共5种分别是Or-Done模式、扇入模式、扇出模式、Stream和map-reduce。

Or-Done模式

首先来看Or-Done模式。Or-Done模式是信号通知模式中更宽泛的一种模式。这里提到了“信号通知模式”我先来解释一下。

我们会使用“信号通知”实现某个任务执行完成后的通知机制在实现时我们为这个任务定义一个类型为chan struct{}类型的done变量等任务结束后我们就可以close这个变量然后其它receiver就会收到这个通知。

这是有一个任务的情况如果有多个任务只要有任意一个任务执行完我们就想获得这个信号这就是Or-Done模式。

比如,你发送同一个请求到多个微服务节点,只要任意一个微服务节点返回结果,就算成功,这个时候,就可以参考下面的实现:

func or(channels ...<-chan interface{}) <-chan interface{} {
    // 特殊情况只有零个或者1个chan
    switch len(channels) {
    case 0:
        return nil
    case 1:
        return channels[0]
    }

    orDone := make(chan interface{})
    go func() {
        defer close(orDone)

        switch len(channels) {
        case 2: // 2个也是一种特殊情况
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default: //超过两个,二分法递归处理
            m := len(channels) / 2
            select {
            case <-or(channels[:m]...):
            case <-or(channels[m:]...):
            }
        }
    }()

    return orDone
}

我们可以写一个测试程序测试它:

func sig(after time.Duration) <-chan interface{} {
    c := make(chan interface{})
    go func() {
        defer close(c)
        time.Sleep(after)
    }()
    return c
}


func main() {
    start := time.Now()

    <-or(
        sig(10*time.Second),
        sig(20*time.Second),
        sig(30*time.Second),
        sig(40*time.Second),
        sig(50*time.Second),
        sig(01*time.Minute),
    )

    fmt.Printf("done after %v", time.Since(start))
}

这里的实现使用了一个巧妙的方式,当chan的数量大于2时使用递归的方式等待信号

在chan数量比较多的情况下递归并不是一个很好的解决方式根据这一讲最开始介绍的反射的方法我们也可以实现Or-Done模式

func or(channels ...<-chan interface{}) <-chan interface{} {
    //特殊情况只有0个或者1个
    switch len(channels) {
    case 0:
        return nil
    case 1:
        return channels[0]
    }

    orDone := make(chan interface{})
    go func() {
        defer close(orDone)
        // 利用反射构建SelectCase
        var cases []reflect.SelectCase
        for _, c := range channels {
            cases = append(cases, reflect.SelectCase{
                Dir:  reflect.SelectRecv,
                Chan: reflect.ValueOf(c),
            })
        }

        // 随机选择一个可用的case
        reflect.Select(cases)
    }()


    return orDone
}

这是递归和反射两种方法实现Or-Done模式的代码。反射方式避免了深层递归的情况可以处理有大量chan的情况。其实最笨的一种方法就是为每一个Channel启动一个goroutine不过这会启动非常多的goroutine太多的goroutine会影响性能所以不太常用。你只要知道这种用法就行了不用重点掌握。

扇入模式

扇入借鉴了数字电路的概念,它定义了单个逻辑门能够接受的数字信号输入最大量的术语。一个逻辑门可以有多个输入,一个输出。

在软件工程中模块的扇入是指有多少个上级模块调用它。而对于我们这里的Channel扇入模式来说就是指有多个源Channel输入、一个目的Channel输出的情况。扇入比就是源Channel数量比1。

每个源Channel的元素都会发送给目标Channel相当于目标Channel的receiver只需要监听目标Channel就可以接收所有发送给源Channel的数据。

扇入模式也可以使用反射、递归或者是用最笨的每个goroutine处理一个Channel的方式来实现。

这里我列举下递归和反射的方式,帮你加深一下对这个技巧的理解。

反射的代码比较简短易于理解主要就是构造出SelectCase slice然后传递给reflect.Select语句。

func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {
    out := make(chan interface{})
    go func() {
        defer close(out)
        // 构造SelectCase slice
        var cases []reflect.SelectCase
        for _, c := range chans {
            cases = append(cases, reflect.SelectCase{
                Dir:  reflect.SelectRecv,
                Chan: reflect.ValueOf(c),
            })
        }
        
        // 循环从cases中选择一个可用的
        for len(cases) > 0 {
            i, v, ok := reflect.Select(cases)
            if !ok { // 此channel已经close
                cases = append(cases[:i], cases[i+1:]...)
                continue
            }
            out <- v.Interface()
        }
    }()
    return out
}

递归模式也是在Channel大于2时采用二分法递归merge。

func fanInRec(chans ...<-chan interface{}) <-chan interface{} {
    switch len(chans) {
    case 0:
        c := make(chan interface{})
        close(c)
        return c
    case 1:
        return chans[0]
    case 2:
        return mergeTwo(chans[0], chans[1])
    default:
        m := len(chans) / 2
        return mergeTwo(
            fanInRec(chans[:m]...),
            fanInRec(chans[m:]...))
    }
}

这里有一个mergeTwo的方法是将两个Channel合并成一个Channel是扇入形式的一种特例只处理两个Channel。 下面我来借助一段代码帮你理解下这个方法。

func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
    c := make(chan interface{})
    go func() {
        defer close(c)
        for a != nil || b != nil { //只要还有可读的chan
            select {
            case v, ok := <-a:
                if !ok { // a 已关闭设置为nil
                    a = nil
                    continue
                }
                c <- v
            case v, ok := <-b:
                if !ok { // b 已关闭设置为nil
                    b = nil
                    continue
                }
                c <- v
            }
        }
    }()
    return c
}

扇出模式

有扇入模式,就有扇出模式,扇出模式是和扇入模式相反的。

扇出模式只有一个输入源Channel有多个目标Channel扇出比就是1比目标Channel数的值经常用在设计模式中的观察者模式中(观察者设计模式定义了对象间的一种一对多的组合关系。这样一来,一个对象的状态发生变化时,所有依赖于它的对象都会得到通知并自动刷新)。在观察者模式中,数据变动后,多个观察者都会收到这个变更信号。

下面是一个扇出模式的实现。从源Channel取出一个数据后依次发送给目标Channel。在发送给目标Channel的时候可以同步发送也可以异步发送

func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
    go func() {
        defer func() { //退出时关闭所有的输出chan
            for i := 0; i < len(out); i++ {
                close(out[i])
            }
        }()

        for v := range ch { // 从输入chan中读取数据
            v := v
            for i := 0; i < len(out); i++ {
                i := i
                if async { //异步
                    go func() {
                        out[i] <- v // 放入到输出chan中,异步方式
                    }()
                } else {
                    out[i] <- v // 放入到输出chan中同步方式
                }
            }
        }
    }()
}

你也可以尝试使用反射的方式来实现,我就不列相关代码了,希望你课后可以自己思考下。

Stream

这里我来介绍一种把Channel当作流式管道使用的方式也就是把Channel看作流Stream提供跳过几个元素或者是只取其中的几个元素等方法。

首先我们提供创建流的方法。这个方法把一个数据slice转换成流

func asStream(done <-chan struct{}, values ...interface{}) <-chan interface{} {
    s := make(chan interface{}) //创建一个unbuffered的channel
    go func() { // 启动一个goroutine往s中塞数据
        defer close(s) // 退出时关闭chan
        for _, v := range values { // 遍历数组
            select {
            case <-done:
                return
            case s <- v: // 将数组元素塞入到chan中
            }
        }
    }()
    return s
}

流创建好以后,该咋处理呢?下面我再给你介绍下实现流的方法。

  1. takeN只取流中的前n个数据
  2. takeFn筛选流中的数据只保留满足条件的数据
  3. takeWhile只取前面满足条件的数据一旦不满足条件就不再取
  4. skipN跳过流中前几个数据
  5. skipFn跳过满足条件的数据
  6. skipWhile跳过前面满足条件的数据一旦不满足条件当前这个元素和以后的元素都会输出给Channel的receiver。

这些方法的实现很类似我们以takeN为例来具体解释一下。

func takeN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
    takeStream := make(chan interface{}) // 创建输出流
    go func() {
        defer close(takeStream)
        for i := 0; i < num; i++ { // 只读取前num个元素
            select {
            case <-done:
                return
            case takeStream <- <-valueStream: //从输入流中读取元素
            }
        }
    }()
    return takeStream
}

map-reduce

map-reduce是一种处理数据的方式最早是由Google公司研究提出的一种面向大规模数据处理的并行计算模型和方法开源的版本是hadoop前几年比较火。

不过我要讲的并不是分布式的map-reduce而是单机单进程的map-reduce方法。

map-reduce分为两个步骤第一步是映射map处理队列中的数据第二步是规约reduce把列表中的每一个元素按照一定的处理方式处理成结果放入到结果队列中。

就像做汉堡一样map就是单独处理每一种食材reduce就是从每一份食材中取一部分做成一个汉堡。

我们先来看下map函数的处理逻辑:

func mapChan(in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} {
    out := make(chan interface{}) //创建一个输出chan
    if in == nil { // 异常检查
        close(out)
        return out
    }

    go func() { // 启动一个goroutine,实现map的主要逻辑
        defer close(out)
        for v := range in { // 从输入chan读取数据执行业务操作也就是map操作
            out <- fn(v)
        }
    }()

    return out
}

reduce函数的处理逻辑如下

func reduce(in <-chan interface{}, fn func(r, v interface{}) interface{}) interface{} {
    if in == nil { // 异常检查
        return nil
    }

    out := <-in // 先读取第一个元素
    for v := range in { // 实现reduce的主要逻辑
        out = fn(out, v)
    }

    return out
}

我们可以写一个程序这个程序使用map-reduce模式处理一组整数map函数就是为每个整数乘以10reduce函数就是把map处理的结果累加起来

// 生成一个数据流
func asStream(done <-chan struct{}) <-chan interface{} {
    s := make(chan interface{})
    values := []int{1, 2, 3, 4, 5}
    go func() {
        defer close(s)
        for _, v := range values { // 从数组生成
            select {
            case <-done:
                return
            case s <- v:
            }
        }
    }()
    return s
}

func main() {
    in := asStream(nil)

    // map操作: 乘以10
    mapFn := func(v interface{}) interface{} {
        return v.(int) * 10
    }

    // reduce操作: 对map的结果进行累加
    reduceFn := func(r, v interface{}) interface{} {
        return r.(int) + v.(int)
    }

    sum := reduce(mapChan(in, mapFn), reduceFn) //返回累加结果
    fmt.Println(sum)
}

总结

这节课我借助代码示例带你学习了Channel的应用场景和应用模式。这几种模式不是我们学习的终点而是学习的起点。掌握了这几种模式之后我们可以延伸出更多的模式。

虽然Channel最初是基于CSP设计的用于goroutine之间的消息传递的一种数据类型但是除了消息传递这个功能之外大家居然还演化出了各式各样的应用模式。我不确定Go的创始人在设计这个类型的时候有没有想到这一点但是我确实被各位大牛利用Channel的各种点子折服了比如有人实现了一个基于TCP网络的分布式的Channel。

在使用Go开发程序的时候你也不妨多考虑考虑是否能够使用chan类型看看你是不是也能创造出别具一格的应用模式。

思考题

想一想我们在利用chan实现互斥锁的时候如果buffer设置的不是1而是一个更大的值会出现什么状况吗能解决什么问题吗

欢迎在留言区写下你的思考和答案,我们一起交流讨论。如果你觉得有所收获,也欢迎你把今天的内容分享给你的朋友或同事。