golang并发三板斧系列之一:channel用于通信和同步

某不知名程序员曾说过:

我们爱golang,很大程度是因为爱它的并发。

我们来总结下golang的并发三板斧,channel、pool、context。一路看下来,套路也就那么几个。

Concurrency is not parallelism

首先必须牢记一个重要的概念:并发不是并行

并发是描述代码架构,是指在模型上可以同时处理多件事务。并行是描述执行状态,除了代码限制外,如果没有多CPU多核心,谈何并行呢?对于单CPU单核心的硬件(当然现在基本上很少了),并发往往带来更多的进程间切换,反而会拖累效率。

那到底要不要并行?或者说什么情况下并行的收益最大?关键在于我们的模型是CPU密集型还是IO密集型的,一般来讲前者更适用于并行。听起来好像有点反常识,这么来思考一下:

  • CPU密集型不会主动释放系统线程,人为释放的话会有时间成本。利用并行的话,可以尽量让CPU密集型的任务独占一个系统线程。
  • IO密集型本身遇到系统调用或者网络阻塞就会释放系统线程,所以单纯的并发模型就能满足需要了,可以有效利用单一的系统线程。

本系列文章只谈golang的代码模型,因此取并发二字。下图很直观的区分了并发和并行:

image

goroutine

这大概是golang中最吸引人的地方了。

[图片上传失败...(image-b09ed-1557402795588)]

启动goroutine

关键字go启动一个goroutine,新手容易犯以下的错误,使得goroutine没有真正运行:

func testBoring(msg string) {
    for i := 0; ; i++ {
        log.Printf("%s %d", msg, i)
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
}

func TestBasicGo(t *testing.T) {
    go testBoring("boring!")
}

主进程在启动的goroutine并没有运行到就退出了,因此没有效果,不会有打印。

等待goroutine执行

func testBoring(msg string) {
    for i := 0; ; i++ {
        log.Printf("%s %d", msg, i)
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
}

func TestGoAndWait(t *testing.T) {
    go testBoring("boring!")

    log.Printf("I'm listening.")
    time.Sleep(time.Second)
    log.Printf("I'm leaving.")
}

如此,手动用time.Sleep阻塞主进程,使得启动的goroutine能开始执行:

2019/02/17 17:55:33 I'm listening.
2019/02/17 17:55:33 boring! 0
2019/02/17 17:55:34 boring! 1
2019/02/17 17:55:34 boring! 2
2019/02/17 17:55:34 I'm leaving.

Process finished with exit code 0

goroutine执行时机

那么go出去的goroutine到底什么时候能执行到呢?golang的调度器是非抢占式的,在GPM的架构里,Waiting态的goroutine(在LRQ或者GRQ中等待执行)必须要等Executing态的goroutine主动退出执行,才能绑定到M上执行。主动退出的方式有很多,比如time.Sleep(),比如runtime.Gosched()。GPM架构可以参考很多资料比如这篇。

因此单核心或者多核心在调度goroutine的时候可能会有很大差异,如下示例:

func TestGoFuncParam(t *testing.T) {
    for i := 0; i < 10; i++ {
        go func() {
            log.Println(i)
        }()
    }
    time.Sleep(100 * time.Millisecond)
    log.Print("xxxxxxxxxxxxxx")

    for i := 0; i < 10; i++ {
        go func(n *int) {
            log.Println(*n)
        }(&i)
    }
    time.Sleep(100 * time.Millisecond)
}

代码在单核心运行时(注意-cpu 1),结果是确定的,所有启动的goroutine都是等主进程sleep的时候才能切换执行:

C02S259EFVH3:go_concurrency baixiao$ go test -cpu 1 -run TestGoFuncParam
2019/02/17 18:20:10 10
2019/02/17 18:20:10 10
2019/02/17 18:20:10 10
2019/02/17 18:20:10 10
2019/02/17 18:20:10 10
2019/02/17 18:20:10 10
2019/02/17 18:20:10 10
2019/02/17 18:20:10 10
2019/02/17 18:20:10 10
2019/02/17 18:20:10 10
2019/02/17 18:20:10 xxxxxxxxxxxxxx
2019/02/17 18:20:10 10
2019/02/17 18:20:10 10
2019/02/17 18:20:10 10
2019/02/17 18:20:10 10
2019/02/17 18:20:10 10
2019/02/17 18:20:10 10
2019/02/17 18:20:10 10
2019/02/17 18:20:10 10
2019/02/17 18:20:10 10
2019/02/17 18:20:10 10
PASS
ok      _/Users/baixiao/Go/src/github.com/baixiaoustc/go_concurrency    0.214s

在多核心时则不同,for循环里的第一个goroutine可能在主进程走完for代码之前就执行到(每次运行可能结果不同):

C02S259EFVH3:go_concurrency baixiao$ go test -cpu 8 -run TestGoFuncParam
2019/02/17 18:20:24 7
2019/02/17 18:20:24 10
2019/02/17 18:20:24 10
2019/02/17 18:20:24 10
2019/02/17 18:20:24 10
2019/02/17 18:20:24 10
2019/02/17 18:20:24 10
2019/02/17 18:20:24 10
2019/02/17 18:20:24 10
2019/02/17 18:20:24 10
2019/02/17 18:20:25 xxxxxxxxxxxxxx
2019/02/17 18:20:25 1
2019/02/17 18:20:25 10
2019/02/17 18:20:25 10
2019/02/17 18:20:25 10
2019/02/17 18:20:25 10
2019/02/17 18:20:25 10
2019/02/17 18:20:25 10
2019/02/17 18:20:25 10
2019/02/17 18:20:25 10
2019/02/17 18:20:25 10
PASS
ok      _/Users/baixiao/Go/src/github.com/baixiaoustc/go_concurrency    0.219s

channel:goroutine间的通信

搞并发编程,怎么能没有进程间(线程间/协程间)通信呢?

goroutine需要通信

在上例TestGoAndWait中,我们假装主goroutine听到了子goroutine的话(注意这里的子goroutine仅用于方便描述,并不像多进程里面有父子关系,下同)。实则不然。要这两个goroutine通信,我们需要channel。来一个最简单的示例,当然这是一个死循环:

func testBoringWithChannel(msg string, c chan string) {
    for i := 0; ; i++ {
        c <- fmt.Sprintf("%s %d", msg, i)
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
}

func TestGoWithChannel(t *testing.T) {
    ch := make(chan string)
    go testBoringWithChannel("boring!", ch)
    for c := range ch {
        log.Printf("You say: %s", c)
    }
}

关闭channel

抛开死循环,怎么在子goroutine结束时告知主goroutine呢?直接退出是会造成死锁的!

fatal error: all goroutines are asleep - deadlock!

此时需要在写端关闭channel:

func testBoringWithChannelClose(msg string, c chan string) {
    for i := 0; i < 5; i++ {
        c <- fmt.Sprintf("%s %d", msg, i)
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
    close(c)
}

func TestGoWithChannelClose(t *testing.T) {
    ch := make(chan string)
    go testBoringWithChannelClose("boring!", ch)
    for c := range ch {
        log.Printf("You say: %s", c)
    }
}
2019/02/19 21:35:23 You say: boring! 0
2019/02/19 21:35:23 You say: boring! 1
2019/02/19 21:35:24 You say: boring! 2
2019/02/19 21:35:24 You say: boring! 3
2019/02/19 21:35:24 You say: boring! 4

Process finished with exit code 0

for range可以自动监测到channel关闭,然后自动退出。但是需要强调的是关闭后的channel不能再写入,如下是个反例:

func TestGoWithChannelCloseThenWrite(t *testing.T) {
    ch := make(chan string)
    go testBoringWithChannelClose("boring!", ch)
    for c := range ch {
        log.Printf("You say: %s", c)
    }
    ch <- "after close"
}
2019/02/18 23:12:31 receive boring! 0
2019/02/18 23:12:31 receive boring! 1
2019/02/18 23:12:32 receive boring! 2
2019/02/18 23:12:32 receive boring! 3
2019/02/18 23:12:32 receive boring! 4
panic: send on closed channel [recovered]
    panic: send on closed channel

goroutine 5 [running]:
testing.tRunner.func1(0xc4200aa0f0)
    /usr/local/go/src/testing/testing.go:711 +0x2d2
panic(0x1114e40, 0x1151930)
    /usr/local/go/src/runtime/panic.go:491 +0x283
github.com/baixiaoustc/go_concurrency.TestGoWithChannelCloseThenWrite(0xc4200aa0f0)
    /Users/baixiao/Go/src/github.com/baixiaoustc/go_concurrency/first_post_test.go:80 +0x167
testing.tRunner(0xc4200aa0f0, 0x1141a58)
    /usr/local/go/src/testing/testing.go:746 +0xd0
created by testing.(*T).Run
    /usr/local/go/src/testing/testing.go:789 +0x2de

Process finished with exit code 2

还需要记住一点,关闭后的channel不能再写入,但是可以继续读,读出来都是定义类型的空值。

只读channel

如何简单地避免上述误操作呢?可以用只读channel,生产者和消费者严格定义好,生产者只写channel,消费者只读channel,如果消费者有写入操作的话在编译时就会报错。但是只读channel的写法有一点技巧,如下的写法就不行,直接编译失败:

func TestReceiveChannelWrong(t *testing.T) {
    ch := make(<-chan int)
    go func() {
        ch <- 1
    }()
    a := <-ch
    log.Println(a)
}
./first_post_test.go:86:6: invalid operation: ch <- 1 (send to receive-only type <-chan int)

Process finished with exit code 2

正确的写法如下,通过函数的返回值限定「只读属性」:

func TestReceiveChannelRight(t *testing.T) {
    ch := func() <-chan int {
        ch := make(chan int)
        go func() {
            ch <- 2
        }()
        return ch
    }()
    a := <-ch
    log.Println(a)
}

channel with select

那么当主goroutine要启动多个子goroutine干不同的事呢?主goroutine不可能依次阻塞到不同的channel上,串行地等待子goroutine依次完工,这样太丑了:

func TestMultiChannelsSerial(t *testing.T) {
    ch1 := make(chan string)
    go testBoringWithChannelClose("boring!", ch1)
    ch2 := make(chan string)
    go testBoringWithChannelClose("funning!", ch2)

    for b := range ch1 {
        log.Printf("You say: %s", b)
    }
    for b := range ch2 {
        log.Printf("You say: %s", b)
    }
}

golang提供了select关键字,提供了多路复用的能力,同时处理多个channel。为了让主goroutine不是一直死循环等,而是在其多个子goroutine完工后继续往下走,这里用到了两个重要特性:

  • 使用_,ok判断channel是否关闭
  • 当通道为nil时,对应的case永远为阻塞
func TestMultiChannelsConcurrently(t *testing.T) {
    ch1 := make(chan string)
    go testBoringWithChannelClose("boring!", ch1)
    ch2 := make(chan string)
    go testBoringWithChannelClose("funning!", ch2)

    for {
        select {
        case c1, ok := <-ch1:
            if !ok {
                ch1 = nil
                log.Print("close ch1")
                continue
            }
            log.Printf("You say: %s", c1)
        case c2, ok := <-ch2:
            if !ok {
                ch2 = nil
                log.Print("close ch2")
                continue
            }
            log.Printf("You say: %s", c2)
        default:
            break
        }

        if ch1 == nil && ch2 == nil {
            break
        }
    }

    log.Print("go on")
}
2019/02/19 21:32:18 You say: funning! 0
2019/02/19 21:32:18 You say: boring! 0
2019/02/19 21:32:18 You say: funning! 1
2019/02/19 21:32:19 You say: boring! 1
2019/02/19 21:32:19 You say: funning! 2
2019/02/19 21:32:19 You say: boring! 2
2019/02/19 21:32:19 You say: funning! 3
2019/02/19 21:32:19 You say: boring! 3
2019/02/19 21:32:19 You say: funning! 4
2019/02/19 21:32:20 You say: boring! 4
2019/02/19 21:32:20 close ch2
2019/02/19 21:32:20 close ch1

2019/02/19 21:32:20 go on

这里值得一提的是,funning和boring肯定是交替出现的,但是第一个是funning还是boring是随机的,因为select还有一个特征是,当多个case同时满足的时候,随机地选一个执行。

构造定时器

channel + select 的另一个经典运用是超时管理:

func TestTimeOutEach(t *testing.T) {
    ch := make(chan string)
    go testBoringWithChannel("boring!", ch)
    for i := 0; i < 5; i++ {
        select {
        case c := <-ch:
            log.Printf("You say: %s", c)
        case <-time.After(500 * time.Millisecond):
            log.Println("You talk too slow.")
        }
    }
}
2019/02/19 21:08:14 You say: boring! 0
2019/02/19 21:08:14 You say: boring! 1
2019/02/19 21:08:15 You talk too slow.
2019/02/19 21:08:15 You say: boring! 2
2019/02/19 21:08:15 You talk too slow.

Process finished with exit code 0

上述是针对每次循环内部的超时,如果要对整个会话进行管理:

func TestTimeOutWhole(t *testing.T) {
    ch := make(chan string)
    timeout := time.After(1 * time.Second)
    go testBoringWithChannel("boring!", ch)
    for i := 0; i < 5; i++ {
        select {
        case c := <-ch:
            log.Printf("You say: %s", c)
        case <-timeout:
            log.Println("You talk too much.")
            return
        }
    }
}

channel生成器

还有一个比较常用的方法在上面的只读channel也提到了,雅称channel生成器:

func TestChannelGenerator(t *testing.T) {
    ch := testBoringWithChannelGenerate("boring!")
    for i := 0; i < 5; i++ {
        log.Printf("You say: %q\n", <-ch)
    }
    log.Println("I'm leaving.")
}
2019/02/19 21:20:40 You say: "boring! 0"
2019/02/19 21:20:40 You say: "boring! 1"
2019/02/19 21:20:41 You say: "boring! 2"
2019/02/19 21:20:41 You say: "boring! 3"
2019/02/19 21:20:42 You say: "boring! 4"

2019/02/19 21:20:42 I'm leaving.

channel用于退出

扩展上个示例,如果想在主goroutine退出之前告知子goroutine也退出呢?还是利用channel,只不过发送者和接受者对调了:

func testBoringWithChannelQuit(msg string, quit chan bool) <-chan string {
    c := make(chan string)
    go func() {
        for i := 0; ; i++ {
            select {
            case c <- fmt.Sprintf("%s %d", msg, i):
                time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
            case <-quit:
                return
            }

        }
    }()
    return c
}

func TestChannelQuit(t *testing.T) {
    quit := make(chan bool)
    c := testBoringWithChannelQuit("boring!", quit)
    for i := 0; i < 5; i++ {
        log.Printf("You say: %q\n", <-c)
    }
    log.Println("I'm leaving.")
    quit <- true
}

通信和同步

上面提到的channel都是没有buffer的,很大程度上起了「同步」的作用。发送者和接受者都必须等双方都准备好才能通信,实际上是一个同步模型。

后面会再提带buffer的channel。


本篇的模型可以类比为你现在是个师傅,带了个小学徒一起做事情。一开始双方没有通信,各搞各的,指挥学徒很麻烦。后面用上了channel,师傅能从学徒那里收到回馈了,也能定时触发一些事情了,也能由师傅指定学徒该下班了。小作坊就越开越顺了。


所有代码都在https://github.com/baixiaoustc/go_concurrency/blob/master/first_post_test.go中能找到。

原文载于golang并发三板斧系列之一:channel用于通信和同步

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351

推荐阅读更多精彩内容