0


通俗易懂剖析Go Channel:理解并发通信的核心机制

本文来自 Go就业训练营 小韬同学的投稿。

也强烈安利大家多写博客,不仅能倒逼自己学习总结,也能作为简历的加分项,提高求职面试的竞争力。

你想想看:面试官看到你简历中的博客主页有几十篇文章,几千粉丝是什么感觉。要比你空洞洞的写一句“热爱技术”强太多啦!

正文

我们在学习与使用Go语言的过程中,对

channel

并不陌生,

channel

是Go语言与众不同的特性之一,也是非常重要的一环,深入理解

Channel

,相信能够在使用的时候更加的得心应手。

一、Channel基本用法

1、channel类别

channel

在类型上,可以分为两种:

  • 双向channel:既能接收又能发送的channel
  • 单向channel:只能发送或只能接收的channel,即单向channel可以为分为: - 只写channel- 只读channel

声明并初始化如下如下:

funcmain(){// 声明并初始化  var ch chanstring=make(chanstring)// 双向channel  var readCh <-chanstring=make(<-chanstring)// 只读channel  var writeCh chan<-string=make(chan<-string)// 只写channel  }

上述定义中,

<-

表示单向的

channel

。如果箭头指向

chan

,就表示只写

channel

,可以往

chan

里边写入数据;如果箭头远离

chan

,则表示为只读

channel

,可以从

chan

读数据。

在定义channel时,可以定义任意类型的channel,因此也同样可以定义chan类型的channel。例如:

a :=make(chan<-chanint)// 定义类型为 chan int 的写channel  
b :=make(chan<-<-chanint)// 定义类型为 <-chan int 的写channel  
c :=make(<-chan<-chanint)// 定义类型为 <-chan int 的读channel  
d :=make(chan(<-chanint))// 定义类型为 (<-chan int) 的读channel  

channel

未初始化时,其**零值为

nil

nil 是 chan 的零值,是一种特殊的 chan,对值是 nil 的 chan 的发送接收调用者总是会阻塞。**

funcmain(){var ch chanstring  
    fmt.Println(ch)// <nil>  }

通过

make

我们可以初始化一个channel,并且可以设置其容量的大小,如下初始化了一个类型为

string

,其容量大小为

512

channel

var ch chanstring=make(chanstring,512)

当初始化定义了

channel

的容量,则这样的

channel

叫做

buffered chan

,即**有缓冲

channel

**。如果没有设置容量,

channel

的容量为0,这样的

channel

叫做

unbuffered chan

,即**无缓冲

channel

**。

有缓冲

channel

中,如果

channel

中还有数据,则从这个

channel

接收数据时不会被阻塞。如果

channel

的容量还未满,那么向这个

channel

发送数据也不会被阻塞,反之则会被阻塞。

无缓冲

channel

则只有当读写操作都准备好后,才不会阻塞,这也是

unbuffered chan

在使用过程中非常需要注意的一点,否则可能会出现常见的bug。

channel的常见操作:

  1. 发送数据

往channel发送一个数据使用

ch <-
funcmain(){var ch chanint=make(chanint,512)  
    ch <-2000}

上述的

ch

可以是

chan int

类型,也可以是单向

chan <-int

  1. 接收数据

从channel接收一条数据可以使用

<-ch
funcmain(){var ch chanint=make(chanint,512)  
    ch <-2000// 发送数据  
  
    data :=<-ch // 接收数据  
    fmt.Println(data)// 2000  }

ch 类型是

chan T

,也可以是单向

<-chan T

在接收数据时,可以返回两个返回值。**第一个返回值返回

channel

中的元素,第二个返回值为

bool

类型**,表示是否成功地从

channel

中读取到一个值。

如果第二个参数是

false

,则**表示

channel

已经被

close

而且

channel

中没有缓存的数据**,这个时候第一个值返回的是零值。

funcmain(){var ch chanint=make(chanint,512)  
    ch <-2000// 发送数据  
  
    data1, ok1 :=<-ch // 接收数据  
    fmt.Printf("data1 = %d, ok1 = %t\n", data1, ok1)// data1 = 2000, ok1 = true  close(ch)// 关闭channel  
    data2, ok2 :=<-ch  // 接收数据  
    fmt.Printf("data2 = %d, ok2 = %t", data2, ok2)// data2 = 0, ok2 = false  }

所以,如果从

channel

读取到一个零值,可能是发送操作真正发送的零值,也可能是

closed

关闭

channel

并且

channel

没有缓存元素产生的零值,这是需要注意判别的一个点。

  1. 其他操作

Go内建的函数

close

cap

len

都可以对

chan

类型进行操作。

  • close:关闭channel。
  • cap:返回channel的容量。
  • len:返回channel缓存中还未被取走的元素数量。
funcmain(){var ch chanint=make(chanint,512)  
    ch <-100  
    ch <-200  
    fmt.Println("ch len:",len(ch))// ch len: 2  
    fmt.Println("ch cap:",cap(ch))// ch cap: 512  }

发送操作接收操作可以作为

select

语句中的

case clause

,例如:

funcmain(){var ch =make(chanint,512)for i :=0; i <10; i++{select{case ch <- i:case v :=<-ch:  
          fmt.Println(v)}}}
for-range

语句同样可以在

chan

中使用,例如:

funcmain(){var ch =make(chanint,512)  
    ch <-100  
    ch <-200  
    ch <-300for v :=range ch {  
       fmt.Println(v)}}// 执行结果  100200300

2、select介绍

在Go语言中,

select

语句用于监控一组

case

语句,根据特定的条件执行相对应的

case

语句或

default

语句,与

switch

类似,但不同之处在于

select

语句中所有

case

中的表达式都必须是

channel

的发送或接收操作。

select

使用示例代码如下:

select{case<-ch1:  
    fmt.Println("ch1")case ch2 <-1:  
    fmt.Println("ch2")}

上述代码中,

select

关键字让当前

goroutine

同时等待

ch1

的可读和

ch2

的可写,在满足任意一个

case

分支之前,

select

会一直阻塞下去,直到其中的一个

channel

转为就绪状态时执行对应

case

分支的代码。如果多个

channel

同时就绪的话则随机选择一个

case

执行。

当使用空

select

时,空的

select

语句会直接阻塞当前的

goroutine

,使得该

goroutine

进入无法被唤醒的永久休眠状态。空

select

,即

select

内不包含任何

case

select{  
    
}  

另外当

select

语句内只有一个

case

分支时,如果该

case

分支不满足,那么当前

select

就变成了一个阻塞的

channel

读/写操作。

select{case<-ch1:  
    fmt.Println("ch1")}

上述

select

中,当

ch1

可读时,会执行打印操作,反之则阻塞当前

goroutine

select

语句内包含

default

分支时,如果

select

内的所有

case

都不满足,则会执行

default

分支的逻辑,用于当其他

case

都不满足时执行一些默认操作。

select{case<-ch1:  
    fmt.Println("ch1")case ch2 <-1:  
    fmt.Println("ch2")default:  
    fmt.Println("default")}

上述代码中,当

ch1

可读或

ch2

可写时,会执行相应的打印操作,否则就执行

default

语句中的代码,相当于一个非阻塞的

channel

读取操作。

select

的使用可以总结为:

  • select不存在任何的case且没有default分支:永久阻塞当前 goroutine;
  • select只存在一个case且没有default分支:阻塞的发送/接收;
  • select存在多个case:随机选择一个满足条件的case执行;
  • select存在default,其他case都不满足时:执行default语句中的代码;

二、Channel实现原理

从代码的角度剖析

channel

的实现,能够让我们更好的去使用

channel

我们可以从

chan

类型的数据结构、初始化以及三个操作发送、接收和关闭这几个方面来了解

channel

1、chan数据结构

chan类型的数据结构定义位于runtime.hchan,其结构体定义如下:

type hchan struct{  
    qcount   uint// total data in the queue  
    dataqsiz uint// size of the circular queue  
    buf      unsafe.Pointer // points to an array of dataqsiz elements  
    elemsize uint16  
    closed   uint32  
    elemtype *_type // element type  
    sendx    uint// send index  
    recvx    uint// receive index  
    recvq    waitq  // list of recv waiters  
    sendq    waitq  // list of send waiters  // lock protects all fields in hchan, as well as several  // fields in sudogs blocked on this channel.  //  // Do not change another G's status while holding this lock  // (in particular, do not ready a G), as this can deadlock  // with stack shrinking.  
    lock mutex  
}

解释一下上述各个字段的意义:

  • qcount:表示chan中已经接收到的数据且还未被取走的元素个数。内建函数len可以返回这个字段的值。
  • datasiz:循环队列的大小。chan在实现上使用一个循环队列来存放元素的个数,循环队列适用于生产者-消费者的场景。
  • buf:存放元素的循环队列bufferbuf 字段是一个指向队列缓冲区的指针,即指向一个dataqsiz元素的数组。buf 字段是使用 unsafe.Pointer 类型来表示队列缓冲区的起始地址。unsafe.Pointer是一种特殊的指针类型,它可以用于指向任何类型的数据。由于队列缓冲区的类型是动态分配的,所以不能直接使用某个具体类型的指针来表示。
  • elemtypeelemsizeelemtype表示chan中元素的数据类型,elemsize表示其大小。当chan定义后,它的元素类型是固定的,即普通类型或者指针类型,因此元素大小也是固定的。
  • sendx:处理发送数据操作的指针在buf队列中的位置。当channel接收到了新的数据时,该指针就会加上elemsize,移动到下一个位置。buf 的总大小是elemsize的整数倍且buf是一个循环列表。
  • recvx:处理接收数据操作的指针在buf队列中的位置。当从buf中取出数据,此指针会移动到下一个位置。
  • recvq:当接收操作发现channel中没有数据可读时,会被则色,此时会被加入到recvq队列中。
  • sendq:当发送操作发现buf队列已满时,会被进行阻塞,此时会被加入到sendq队列中。

image.png

2、chan初始化

channel

在进行初始化时,Go编译器会根据是否传入容量的大小,来选择调用

makechan64

,还是

makechan

makechan64

在实现上底层还是调用

makechan

来进行初始化,

makechan64

只是对

size

做了检查。

makechan

函数根据

chan

的容量的大小和元素的类型不同,初始化不同的存储空间。省略一些检查代码,

makechan

函数的主要逻辑如下:

funcmakechan(t *chantype, size int)*hchan {  
    elem := t.elem  
      
    ...  
  
    mem, overflow := math.MulUintptr(elem.size,uintptr(size))...var c *hchan  
    switch{case mem ==0:// 队列或元素大小为零,不必创建buf  
       c =(*hchan)(mallocgc(hchanSize,nil,true))  
       c.buf = c.raceaddr()case elem.ptrdata ==0:// 元素不包含指针,分配一块连续的内存给hchan数据结构和buf  // hchan数据结构后面紧接着就是buf,在一次调用中分配hchan和buf  
       c =(*hchan)(mallocgc(hchanSize+mem,nil,true))  
       c.buf =add(unsafe.Pointer(c), hchanSize)default:// 元素包含指针,单独分配buf  
       c =new(hchan)  
       c.buf =mallocgc(mem, elem,true)}// 记录元素大小、类型、容量  
    c.elemsize =uint16(elem.size)  
    c.elemtype = elem  
    c.dataqsiz =uint(size)lockInit(&c.lock, lockRankHchan)...return c  
}

3、send发送操作

Go在编译发送数据给

channel

时,会把发送操作

send

转换成

chansend1

函数,而

chansend1

函数会调用

chansend

函数。

funcchansend1(c *hchan, elem unsafe.Pointer){chansend(c, elem,true,getcallerpc())}

我们可以来分段分析

chansend

函数的实现逻辑。

第一部分:

主要是对

chan

进行判断,判断

chan

是否为

nil

,若为

nil

,则判断是否需要将当前

goroutine

进行阻塞,阻塞通过

gopark

来对调用者

goroutine park

(阻塞休眠)。

funcchansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)bool{// 第一部分  if c ==nil{// 判断chan是否为nil  if!block {// 判断是否需要阻塞当前goroutine  returnfalse}// 调用这goroutine park,进行阻塞休眠  gopark(nil,nil, waitReasonChanSendNilChan, traceEvGoStop,2)throw("unreachable")}...}

第二部分

第二部分的逻辑判断是当你往一个容量已满的

chan

实例发送数据,且不想当前调用的

goroutine

被阻塞时(

chan

未被关闭),那么处理的逻辑是直接返回。

funcchansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)bool{...// 第二部分  if!block && c.closed ==0&&full(c){returnfalse}...}

第三部分

第三部分的逻辑判断是首先进行互斥锁加锁,然后判断当前

chan

是否关闭,如果

chan

已经被

close

了,则释放互斥锁并

panic

,即对已关闭的

chan

发送数据会

panic

funcchansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)bool{...// 第三部分  lock(&c.lock)// 开始加锁  if c.closed !=0{// 判断channel是否关闭  unlock(&c.lock)panic(plainError("send on closed channel"))}...}

第四部分

第四部分的逻辑主要是判断接收队列中是否有正在等待的接收方

receiver

。如果存在正在等待的

receiver

(说明此时

buf

中没有缓存的数据),则将他从接收队列中弹出,直接将需要发送到

channel

的数据交给这个

receiver

,而无需放入到

buf

中,让发送操作速度更快一些。

funcchansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)bool{...// 第四部分  if sg := c.recvq.dequeue(); sg !=nil{// 找到了一个正在等待的接收者。我们传递我们想要发送的值  // 直接传递给receiver接收者,绕过channel buf缓存区(如果receiver有的话)  send(c, sg, ep,func(){unlock(&c.lock)},3)returntrue}...}

第五部分

当等待队列中并没有正在等待的

receiver

,则说明当前

buf

还没有满,此时将发送的数据放入到

buf

中。

funcchansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)bool{...// 第五部分  if c.qcount < c.dataqsiz {// 判断buf是否满了  // channel buf还有可用的空间. 将发送数据入buf循环队列.  
       qp :=chanbuf(c, c.sendx)if raceenabled {racenotify(c, c.sendx,nil)}typedmemmove(c.elemtype, qp, ep)  
       c.sendx++if c.sendx == c.dataqsiz {  
          c.sendx =0}  
       c.qcount++unlock(&c.lock)returntrue}...}

第六部分

当逻辑走到第六部分,说明正在处理

buf

已满的情况。如果

buf

已满,则发送操作的

goroutine

就会加入到发送者的等待队列,直到被唤醒。当

goroutine

被唤醒时,数据或者被取走了,或者

chan

已经被关闭了。

funcchansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)bool{...// 第六部分  // chansend1函数调用不会进入if块里,因为chansend1的block=true  if!block {unlock(&c.lock)returnfalse}...  
      
    c.sendq.enqueue(mysg)// 加入发送队列  ...gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend,2)// 阻塞  ...}

4、recv接收操作

channel

中接收数据时,Go会将代码转换成

chanrecv1

函数。如果需要返回两个返回值,则会转换成

chanrecv2

chanrecv1

函数和

chanrecv2

都会调用

chanrecv

函数。

chanrecv1

chanrecv2

传入的

block

参数的值是

true

,两种调用都是阻塞方式,因此在分析

chanrecv

函数的实现时,可以不考虑

block=false

的情况。

// 从已编译代码中进入 <-c 的入口点  funcchanrecv1(c *hchan, elem unsafe.Pointer){chanrecv(c, elem,true)}funcchanrecv2(c *hchan, elem unsafe.Pointer)(received bool){_, received =chanrecv(c, elem,true)return}

同样,省略一些检查类的代码,我们也可以分段分析

chanrecv

函数的逻辑。

第一部分

第一部分主要判断当前进行接收操作的

chan

实例是否为

nil

,若为

nil

,则从

nil chan

中接收数据的调用这

goroutine

会被阻塞。

funcchanrecv(c *hchan, ep unsafe.Pointer, block bool)(selected, received bool){...// 第一部分  if c ==nil{// 判断chan是否为nil  if!block {// 是否阻塞,默认为block=true  return}// 进行阻塞  gopark(nil,nil, waitReasonChanReceiveNilChan, traceEvGoStop,2)throw("unreachable")}...}

第二部分
这一部分只要是考虑

block=false

c

为空的情况,

block=false

的情况我们可以不做考虑。

funcchanrecv(c *hchan, ep unsafe.Pointer, block bool)(selected, received bool){...// 检查未获得锁的失败非阻塞操作。  if!block &&empty(c){...}...}

第三部分

第三部分的逻辑为判断当前

chan

是否被关闭,若当前

chan

已经被

close

了,并且缓存队列中没有缓冲的元素时,返回

true

false

funcchanrecv(c *hchan, ep unsafe.Pointer, block bool)(selected, received bool){...lock(&c.lock)// 加锁,返回时释放锁  // 第三部分  if c.closed !=0{// 当chan已被关闭时  if c.qcount ==0{// 且 buf区 没有缓存的数据了  ...unlock(&c.lock)// 解锁  if ep !=nil{typedmemclr(c.elemtype, ep)}returntrue,false}}...}

第四部分

第四部分是处理通道未关闭且

buf

缓存队列已满的情况。只有当缓存队列已满时,才能够从发送等待队列获取到

sender

。若当前的

chan

unbuffer

chan

,即

无缓冲区channel

时,则直接将

sender

的发送数据传递给

receiver

。否则就从缓存队列的头部读取一个元素值,并将获取的

sender

携带的值加入到

buf

循环队列的尾部。

funcchanrecv(c *hchan, ep unsafe.Pointer, block bool)(selected, received bool){...if c.closed !=0{// 当chan已被关闭时  }else{// 第四部分,通道未关闭  // 如果sendq队列中有等待发送的sender  if sg := c.sendq.dequeue(); sg !=nil{// 存在正在等待的sender,如果缓存区的容量为0则直接将发送方的值传递给接收方  // 反之,则从缓存队列的头部获取数据,并将获取的sender的发送值加入到缓存队列尾部  recv(c, sg, ep,func(){unlock(&c.lock)},3)returntrue,true}}...}

第五部分

第五部分的主要逻辑是处理发送队列中没有等待的

sender

buf

中有缓存的数据。该段逻辑与外出的互斥锁共用一把锁,因此不存在并发问题。当

buf

缓存区有缓存元素时,则取出该元素传递给

receiver

,同时移动接收指针。

funcchanrecv(c *hchan, ep unsafe.Pointer, block bool)(selected, received bool){...// 第五部分  if c.qcount >0{// 发送队列中没有等待的sender,且buf中有缓存数据  // 直接从缓存队列中获取数据  
        qp :=chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx,nil)}if ep !=nil{typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)  
        c.recvx++// 移动接收指针  if c.recvx == c.dataqsiz {// 指针若已到末尾则进行重置(循环队列)  
           c.recvx =0}  
        c.qcount--// 获取数据后,buf缓存区元素个数减一  unlock(&c.lock)// 解锁  returntrue,true}if!block {// block=true  unlock(&c.lock)returnfalse,false}...}

第六部分

第六部分的逻辑主要是处理

buf

缓存区中没有缓存数据的情况。当

buf

缓存区没有缓存数据时,那么当前的

receiver

就会被阻塞,直到它从

sender

中接收了数据,或者是

chan

close

,才会返回。

funcchanrecv(c *hchan, ep unsafe.Pointer, block bool)(selected, received bool){...  
    c.recvq.enqueue(mysg)// 将当前接收操作入接收队列  ...// 进行阻塞,等待唤醒  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv,2)...}

5、close关闭

close

函数主要用于

channel

的关闭,Go编译器会替换成

closechan

函数的调用。省略一些检查下的代码后,

closechan

函数的主要逻辑如下:

  • 如果当前channil,则直接panic
  • 如果当前chan已关闭,再次close则直接panic
  • 如果chan不为nilchan也没有closed,就把等待队列中的 sender(writer)receiver(reader)从队列中全部移除并唤醒。
funcclosechan(c *hchan){if c ==nil{// 若当前chan未nil,则直接panic  panic(plainError("close of nil channel"))}lock(&c.lock)// 加锁  if c.closed !=0{// 若当前chan已经关闭,则直接panic  unlock(&c.lock)panic(plainError("close of closed channel"))}...  
  
    c.closed =1// 设置当前channel的状态为已关闭  var glist gList  
  
    // 释放接收队列中所有的reader  for{  
       sg := c.recvq.dequeue()if sg ==nil{break}if sg.elem !=nil{typedmemclr(c.elemtype, sg.elem)  
          sg.elem =nil}if sg.releasetime !=0{  
          sg.releasetime =cputicks()}  
       gp := sg.g  
       gp.param = unsafe.Pointer(sg)  
       sg.success =falseif raceenabled {raceacquireg(gp, c.raceaddr())}  
       glist.push(gp)}// 释放发送队列中所有的writer (它们会panic)  for{  
       sg := c.sendq.dequeue()if sg ==nil{break}  
       sg.elem =nilif sg.releasetime !=0{  
          sg.releasetime =cputicks()}  
       gp := sg.g  
       gp.param = unsafe.Pointer(sg)  
       sg.success =falseif raceenabled {raceacquireg(gp, c.raceaddr())}  
       glist.push(gp)}unlock(&c.lock)for!glist.empty(){  
       gp := glist.pop()  
       gp.schedlink =0goready(gp,3)}}

三、总结

通过学习

channel

的基本使用,了解其操作背后的实现原理,可以帮助我们更好的使用

channel

,避免一些操作不当而导致的

panic

或者说是

bug

,让我们在使用

channel

时能够更加的得心应手。

channel

的值和状态有多种情况,而不同的操作

(send、recv、close)

又可能得到不同的结果,这是使用

channel

类型时需要经常注意的点,我们可以将不同

channel

值下的不同操作进行一个总结,**特别注意操作

channel

时会产生

panic

的情况,已经可能会导致线程阻塞的情况**,都是有可能导致死锁与

goroutine

泄漏的罪魁祸首。
channel执行操作\channel状态channel为nilchannel buf为空channel buf已满channel buf未满且不为空channel已关闭

receive

接收操作阻塞阻塞读取数据读取数据返回buf中缓存的数据

send

发送操作阻塞写入数据阻塞写入数据panic

close

关闭panic关闭channel,buf中没有缓存数据关闭channel,保留已缓存的数据关闭channel,保留已缓存的数据panic

又出成绩啦

我们又出成绩啦!大厂Offer集锦!遥遥领先!

这些朋友赢麻了!

这是一个专注程序员升职加薪の知识星球

答疑解惑

需要「简历优化」、「就业辅导」、「职业规划」的朋友可以联系我。

加我微信:wangzhongyang1993

关注我的同名公众号:王中阳Go


本文转载自: https://blog.csdn.net/w425772719/article/details/136206470
版权归原作者 王中阳Go 所有, 如有侵权,请联系我们删除。

“通俗易懂剖析Go Channel:理解并发通信的核心机制”的评论:

还没有评论