本文来自 Go就业训练营 小韬同学的投稿。
也强烈安利大家多写博客,不仅能倒逼自己学习总结,也能作为简历的加分项,提高求职面试的竞争力。
你想想看:面试官看到你简历中的博客主页有几十篇文章,几千粉丝是什么感觉。要比你空洞洞的写一句“热爱技术”强太多啦!
正文
我们在学习与使用Go语言的过程中,对
channel
并不陌生,
channel
是Go语言与众不同的特性之一,也是非常重要的一环,深入理解
Channel
,相信能够在使用的时候更加的得心应手。
一、Channel基本用法
1、channel类别
channel
在类型上,可以分为两种:
- 双向channel:既能接收又能发送的
channel
- 单向channel:只能发送或只能接收的
channel
,即单向channel
可以为分为: -只写channel
-只读channel
声明并初始化如下如下:
funcmain(){// 声明并初始化 var ch chanstring=make(chanstring)// 双向channel var readCh <-chanstring=make(<-chanstring)// 只读channel var writeCh chan<-string=make(chan<-string)// 只写channel }
上述定义中,
<-
表示单向的
channel
。如果箭头指向
chan
,就表示只写
channel
,可以往
chan
里边写入数据;如果箭头远离
chan
,则表示为只读
channel
,可以从
chan
读数据。
在定义channel时,可以定义任意类型的channel,因此也同样可以定义chan类型的channel。例如:
a :=make(chan<-chanint)// 定义类型为 chan int 的写channel
b :=make(chan<-<-chanint)// 定义类型为 <-chan int 的写channel
c :=make(<-chan<-chanint)// 定义类型为 <-chan int 的读channel
d :=make(chan(<-chanint))// 定义类型为 (<-chan int) 的读channel
当
channel
未初始化时,其**零值为
nil
。nil 是 chan 的零值,是一种特殊的 chan,对值是 nil 的 chan 的发送接收调用者总是会阻塞。**
funcmain(){var ch chanstring
fmt.Println(ch)// <nil> }
通过
make
我们可以初始化一个channel,并且可以设置其容量的大小,如下初始化了一个类型为
string
,其容量大小为
512
的
channel
:
var ch chanstring=make(chanstring,512)
当初始化定义了
channel
的容量,则这样的
channel
叫做
buffered chan
,即**有缓冲
channel
**。如果没有设置容量,
channel
的容量为0,这样的
channel
叫做
unbuffered chan
,即**无缓冲
channel
**。
有缓冲
channel
中,如果
channel
中还有数据,则从这个
channel
接收数据时不会被阻塞。如果
channel
的容量还未满,那么向这个
channel
发送数据也不会被阻塞,反之则会被阻塞。
无缓冲
channel
则只有当读写操作都准备好后,才不会阻塞,这也是
unbuffered chan
在使用过程中非常需要注意的一点,否则可能会出现常见的bug。
channel的常见操作:
- 发送数据
往channel发送一个数据使用
ch <-
funcmain(){var ch chanint=make(chanint,512)
ch <-2000}
上述的
ch
可以是
chan int
类型,也可以是单向
chan <-int
。
- 接收数据
从channel接收一条数据可以使用
<-ch
funcmain(){var ch chanint=make(chanint,512)
ch <-2000// 发送数据
data :=<-ch // 接收数据
fmt.Println(data)// 2000 }
ch 类型是
chan T
,也可以是单向
<-chan T
在接收数据时,可以返回两个返回值。**第一个返回值返回
channel
中的元素,第二个返回值为
bool
类型**,表示是否成功地从
channel
中读取到一个值。
如果第二个参数是
false
,则**表示
channel
已经被
close
而且
channel
中没有缓存的数据**,这个时候第一个值返回的是零值。
funcmain(){var ch chanint=make(chanint,512)
ch <-2000// 发送数据
data1, ok1 :=<-ch // 接收数据
fmt.Printf("data1 = %d, ok1 = %t\n", data1, ok1)// data1 = 2000, ok1 = true close(ch)// 关闭channel
data2, ok2 :=<-ch // 接收数据
fmt.Printf("data2 = %d, ok2 = %t", data2, ok2)// data2 = 0, ok2 = false }
所以,如果从
channel
读取到一个零值,可能是发送操作真正发送的零值,也可能是
closed
关闭
channel
并且
channel
没有缓存元素产生的零值,这是需要注意判别的一个点。
- 其他操作
Go内建的函数
close
、
cap
、
len
都可以对
chan
类型进行操作。
close
:关闭channel。cap
:返回channel的容量。len
:返回channel缓存中还未被取走的元素数量。
funcmain(){var ch chanint=make(chanint,512)
ch <-100
ch <-200
fmt.Println("ch len:",len(ch))// ch len: 2
fmt.Println("ch cap:",cap(ch))// ch cap: 512 }
发送操作与接收操作可以作为
select
语句中的
case clause
,例如:
funcmain(){var ch =make(chanint,512)for i :=0; i <10; i++{select{case ch <- i:case v :=<-ch:
fmt.Println(v)}}}
for-range
语句同样可以在
chan
中使用,例如:
funcmain(){var ch =make(chanint,512)
ch <-100
ch <-200
ch <-300for v :=range ch {
fmt.Println(v)}}// 执行结果 100200300
2、select介绍
在Go语言中,
select
语句用于监控一组
case
语句,根据特定的条件执行相对应的
case
语句或
default
语句,与
switch
类似,但不同之处在于
select
语句中所有
case
中的表达式都必须是
channel
的发送或接收操作。
select
使用示例代码如下:
select{case<-ch1:
fmt.Println("ch1")case ch2 <-1:
fmt.Println("ch2")}
上述代码中,
select
关键字让当前
goroutine
同时等待
ch1
的可读和
ch2
的可写,在满足任意一个
case
分支之前,
select
会一直阻塞下去,直到其中的一个
channel
转为就绪状态时执行对应
case
分支的代码。如果多个
channel
同时就绪的话则随机选择一个
case
执行。
当使用空
select
时,空的
select
语句会直接阻塞当前的
goroutine
,使得该
goroutine
进入无法被唤醒的永久休眠状态。空
select
,即
select
内不包含任何
case
。
select{
}
另外当
select
语句内只有一个
case
分支时,如果该
case
分支不满足,那么当前
select
就变成了一个阻塞的
channel
读/写操作。
select{case<-ch1:
fmt.Println("ch1")}
上述
select
中,当
ch1
可读时,会执行打印操作,反之则阻塞当前
goroutine
。
当
select
语句内包含
default
分支时,如果
select
内的所有
case
都不满足,则会执行
default
分支的逻辑,用于当其他
case
都不满足时执行一些默认操作。
select{case<-ch1:
fmt.Println("ch1")case ch2 <-1:
fmt.Println("ch2")default:
fmt.Println("default")}
上述代码中,当
ch1
可读或
ch2
可写时,会执行相应的打印操作,否则就执行
default
语句中的代码,相当于一个非阻塞的
channel
读取操作。
select
的使用可以总结为:
select
不存在任何的case
且没有default
分支:永久阻塞当前 goroutine;select
只存在一个case
且没有default
分支:阻塞的发送/接收;select
存在多个case
:随机选择一个满足条件的case
执行;select
存在default
,其他case
都不满足时:执行default
语句中的代码;
二、Channel实现原理
从代码的角度剖析
channel
的实现,能够让我们更好的去使用
channel
。
我们可以从
chan
类型的数据结构、初始化以及三个操作发送、接收和关闭这几个方面来了解
channel
。
1、chan数据结构
chan类型的数据结构定义位于runtime.hchan,其结构体定义如下:
type hchan struct{
qcount uint// total data in the queue
dataqsiz uint// size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint// send index
recvx uint// receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking.
lock mutex
}
解释一下上述各个字段的意义:
qcount
:表示chan
中已经接收到的数据且还未被取走的元素个数。内建函数len
可以返回这个字段的值。datasiz
:循环队列的大小。chan
在实现上使用一个循环队列来存放元素的个数,循环队列适用于生产者-消费者的场景。buf
:存放元素的循环队列buffer
,buf
字段是一个指向队列缓冲区的指针,即指向一个dataqsiz
元素的数组。buf
字段是使用unsafe.Pointer
类型来表示队列缓冲区的起始地址。unsafe.Pointer
是一种特殊的指针类型,它可以用于指向任何类型的数据。由于队列缓冲区的类型是动态分配的,所以不能直接使用某个具体类型的指针来表示。elemtype
、elemsize
:elemtype
表示chan中元素的数据类型,elemsize
表示其大小。当chan定义后,它的元素类型是固定的,即普通类型或者指针类型,因此元素大小也是固定的。sendx
:处理发送数据操作的指针在buf队列中的位置。当channel接收到了新的数据时,该指针就会加上elemsize
,移动到下一个位置。buf
的总大小是elemsize
的整数倍且buf
是一个循环列表。recvx
:处理接收数据操作的指针在buf
队列中的位置。当从buf中取出数据,此指针会移动到下一个位置。recvq
:当接收操作发现channel
中没有数据可读时,会被则色,此时会被加入到recvq
队列中。sendq
:当发送操作发现buf
队列已满时,会被进行阻塞,此时会被加入到sendq
队列中。
2、chan初始化
channel
在进行初始化时,Go编译器会根据是否传入容量的大小,来选择调用
makechan64
,还是
makechan
。
makechan64
在实现上底层还是调用
makechan
来进行初始化,
makechan64
只是对
size
做了检查。
makechan
函数根据
chan
的容量的大小和元素的类型不同,初始化不同的存储空间。省略一些检查代码,
makechan
函数的主要逻辑如下:
funcmakechan(t *chantype, size int)*hchan {
elem := t.elem
...
mem, overflow := math.MulUintptr(elem.size,uintptr(size))...var c *hchan
switch{case mem ==0:// 队列或元素大小为零,不必创建buf
c =(*hchan)(mallocgc(hchanSize,nil,true))
c.buf = c.raceaddr()case elem.ptrdata ==0:// 元素不包含指针,分配一块连续的内存给hchan数据结构和buf // hchan数据结构后面紧接着就是buf,在一次调用中分配hchan和buf
c =(*hchan)(mallocgc(hchanSize+mem,nil,true))
c.buf =add(unsafe.Pointer(c), hchanSize)default:// 元素包含指针,单独分配buf
c =new(hchan)
c.buf =mallocgc(mem, elem,true)}// 记录元素大小、类型、容量
c.elemsize =uint16(elem.size)
c.elemtype = elem
c.dataqsiz =uint(size)lockInit(&c.lock, lockRankHchan)...return c
}
3、send发送操作
Go在编译发送数据给
channel
时,会把发送操作
send
转换成
chansend1
函数,而
chansend1
函数会调用
chansend
函数。
funcchansend1(c *hchan, elem unsafe.Pointer){chansend(c, elem,true,getcallerpc())}
我们可以来分段分析
chansend
函数的实现逻辑。
第一部分:
主要是对
chan
进行判断,判断
chan
是否为
nil
,若为
nil
,则判断是否需要将当前
goroutine
进行阻塞,阻塞通过
gopark
来对调用者
goroutine park
(阻塞休眠)。
funcchansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)bool{// 第一部分 if c ==nil{// 判断chan是否为nil if!block {// 判断是否需要阻塞当前goroutine returnfalse}// 调用这goroutine park,进行阻塞休眠 gopark(nil,nil, waitReasonChanSendNilChan, traceEvGoStop,2)throw("unreachable")}...}
第二部分
第二部分的逻辑判断是当你往一个容量已满的
chan
实例发送数据,且不想当前调用的
goroutine
被阻塞时(
chan
未被关闭),那么处理的逻辑是直接返回。
funcchansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)bool{...// 第二部分 if!block && c.closed ==0&&full(c){returnfalse}...}
第三部分
第三部分的逻辑判断是首先进行互斥锁加锁,然后判断当前
chan
是否关闭,如果
chan
已经被
close
了,则释放互斥锁并
panic
,即对已关闭的
chan
发送数据会
panic
。
funcchansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)bool{...// 第三部分 lock(&c.lock)// 开始加锁 if c.closed !=0{// 判断channel是否关闭 unlock(&c.lock)panic(plainError("send on closed channel"))}...}
第四部分
第四部分的逻辑主要是判断接收队列中是否有正在等待的接收方
receiver
。如果存在正在等待的
receiver
(说明此时
buf
中没有缓存的数据),则将他从接收队列中弹出,直接将需要发送到
channel
的数据交给这个
receiver
,而无需放入到
buf
中,让发送操作速度更快一些。
funcchansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)bool{...// 第四部分 if sg := c.recvq.dequeue(); sg !=nil{// 找到了一个正在等待的接收者。我们传递我们想要发送的值 // 直接传递给receiver接收者,绕过channel buf缓存区(如果receiver有的话) send(c, sg, ep,func(){unlock(&c.lock)},3)returntrue}...}
第五部分
当等待队列中并没有正在等待的
receiver
,则说明当前
buf
还没有满,此时将发送的数据放入到
buf
中。
funcchansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)bool{...// 第五部分 if c.qcount < c.dataqsiz {// 判断buf是否满了 // channel buf还有可用的空间. 将发送数据入buf循环队列.
qp :=chanbuf(c, c.sendx)if raceenabled {racenotify(c, c.sendx,nil)}typedmemmove(c.elemtype, qp, ep)
c.sendx++if c.sendx == c.dataqsiz {
c.sendx =0}
c.qcount++unlock(&c.lock)returntrue}...}
第六部分
当逻辑走到第六部分,说明正在处理
buf
已满的情况。如果
buf
已满,则发送操作的
goroutine
就会加入到发送者的等待队列,直到被唤醒。当
goroutine
被唤醒时,数据或者被取走了,或者
chan
已经被关闭了。
funcchansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)bool{...// 第六部分 // chansend1函数调用不会进入if块里,因为chansend1的block=true if!block {unlock(&c.lock)returnfalse}...
c.sendq.enqueue(mysg)// 加入发送队列 ...gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend,2)// 阻塞 ...}
4、recv接收操作
从
channel
中接收数据时,Go会将代码转换成
chanrecv1
函数。如果需要返回两个返回值,则会转换成
chanrecv2
,
chanrecv1
函数和
chanrecv2
都会调用
chanrecv
函数。
chanrecv1
和
chanrecv2
传入的
block
参数的值是
true
,两种调用都是阻塞方式,因此在分析
chanrecv
函数的实现时,可以不考虑
block=false
的情况。
// 从已编译代码中进入 <-c 的入口点 funcchanrecv1(c *hchan, elem unsafe.Pointer){chanrecv(c, elem,true)}funcchanrecv2(c *hchan, elem unsafe.Pointer)(received bool){_, received =chanrecv(c, elem,true)return}
同样,省略一些检查类的代码,我们也可以分段分析
chanrecv
函数的逻辑。
第一部分
第一部分主要判断当前进行接收操作的
chan
实例是否为
nil
,若为
nil
,则从
nil chan
中接收数据的调用这
goroutine
会被阻塞。
funcchanrecv(c *hchan, ep unsafe.Pointer, block bool)(selected, received bool){...// 第一部分 if c ==nil{// 判断chan是否为nil if!block {// 是否阻塞,默认为block=true return}// 进行阻塞 gopark(nil,nil, waitReasonChanReceiveNilChan, traceEvGoStop,2)throw("unreachable")}...}
第二部分
这一部分只要是考虑
block=false
且
c
为空的情况,
block=false
的情况我们可以不做考虑。
funcchanrecv(c *hchan, ep unsafe.Pointer, block bool)(selected, received bool){...// 检查未获得锁的失败非阻塞操作。 if!block &&empty(c){...}...}
第三部分
第三部分的逻辑为判断当前
chan
是否被关闭,若当前
chan
已经被
close
了,并且缓存队列中没有缓冲的元素时,返回
true
、
false
。
funcchanrecv(c *hchan, ep unsafe.Pointer, block bool)(selected, received bool){...lock(&c.lock)// 加锁,返回时释放锁 // 第三部分 if c.closed !=0{// 当chan已被关闭时 if c.qcount ==0{// 且 buf区 没有缓存的数据了 ...unlock(&c.lock)// 解锁 if ep !=nil{typedmemclr(c.elemtype, ep)}returntrue,false}}...}
第四部分
第四部分是处理通道未关闭且
buf
缓存队列已满的情况。只有当缓存队列已满时,才能够从发送等待队列获取到
sender
。若当前的
chan
为
unbuffer
的
chan
,即
无缓冲区channel
时,则直接将
sender
的发送数据传递给
receiver
。否则就从缓存队列的头部读取一个元素值,并将获取的
sender
携带的值加入到
buf
循环队列的尾部。
funcchanrecv(c *hchan, ep unsafe.Pointer, block bool)(selected, received bool){...if c.closed !=0{// 当chan已被关闭时 }else{// 第四部分,通道未关闭 // 如果sendq队列中有等待发送的sender if sg := c.sendq.dequeue(); sg !=nil{// 存在正在等待的sender,如果缓存区的容量为0则直接将发送方的值传递给接收方 // 反之,则从缓存队列的头部获取数据,并将获取的sender的发送值加入到缓存队列尾部 recv(c, sg, ep,func(){unlock(&c.lock)},3)returntrue,true}}...}
第五部分
第五部分的主要逻辑是处理发送队列中没有等待的
sender
且
buf
中有缓存的数据。该段逻辑与外出的互斥锁共用一把锁,因此不存在并发问题。当
buf
缓存区有缓存元素时,则取出该元素传递给
receiver
,同时移动接收指针。
funcchanrecv(c *hchan, ep unsafe.Pointer, block bool)(selected, received bool){...// 第五部分 if c.qcount >0{// 发送队列中没有等待的sender,且buf中有缓存数据 // 直接从缓存队列中获取数据
qp :=chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx,nil)}if ep !=nil{typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)
c.recvx++// 移动接收指针 if c.recvx == c.dataqsiz {// 指针若已到末尾则进行重置(循环队列)
c.recvx =0}
c.qcount--// 获取数据后,buf缓存区元素个数减一 unlock(&c.lock)// 解锁 returntrue,true}if!block {// block=true unlock(&c.lock)returnfalse,false}...}
第六部分
第六部分的逻辑主要是处理
buf
缓存区中没有缓存数据的情况。当
buf
缓存区没有缓存数据时,那么当前的
receiver
就会被阻塞,直到它从
sender
中接收了数据,或者是
chan
被
close
,才会返回。
funcchanrecv(c *hchan, ep unsafe.Pointer, block bool)(selected, received bool){...
c.recvq.enqueue(mysg)// 将当前接收操作入接收队列 ...// 进行阻塞,等待唤醒 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv,2)...}
5、close关闭
close
函数主要用于
channel
的关闭,Go编译器会替换成
closechan
函数的调用。省略一些检查下的代码后,
closechan
函数的主要逻辑如下:
- 如果当前
chan
为nil
,则直接panic
- 如果当前
chan
已关闭,再次close
则直接panic
- 如果
chan
不为nil
,chan
也没有closed
,就把等待队列中的sender(writer)
和receiver(reader)
从队列中全部移除并唤醒。
funcclosechan(c *hchan){if c ==nil{// 若当前chan未nil,则直接panic panic(plainError("close of nil channel"))}lock(&c.lock)// 加锁 if c.closed !=0{// 若当前chan已经关闭,则直接panic unlock(&c.lock)panic(plainError("close of closed channel"))}...
c.closed =1// 设置当前channel的状态为已关闭 var glist gList
// 释放接收队列中所有的reader for{
sg := c.recvq.dequeue()if sg ==nil{break}if sg.elem !=nil{typedmemclr(c.elemtype, sg.elem)
sg.elem =nil}if sg.releasetime !=0{
sg.releasetime =cputicks()}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success =falseif raceenabled {raceacquireg(gp, c.raceaddr())}
glist.push(gp)}// 释放发送队列中所有的writer (它们会panic) for{
sg := c.sendq.dequeue()if sg ==nil{break}
sg.elem =nilif sg.releasetime !=0{
sg.releasetime =cputicks()}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success =falseif raceenabled {raceacquireg(gp, c.raceaddr())}
glist.push(gp)}unlock(&c.lock)for!glist.empty(){
gp := glist.pop()
gp.schedlink =0goready(gp,3)}}
三、总结
通过学习
channel
的基本使用,了解其操作背后的实现原理,可以帮助我们更好的使用
channel
,避免一些操作不当而导致的
panic
或者说是
bug
,让我们在使用
channel
时能够更加的得心应手。
channel
的值和状态有多种情况,而不同的操作
(send、recv、close)
又可能得到不同的结果,这是使用
channel
类型时需要经常注意的点,我们可以将不同
channel
值下的不同操作进行一个总结,**特别注意操作
channel
时会产生
panic
的情况,已经可能会导致线程阻塞的情况**,都是有可能导致死锁与
goroutine
泄漏的罪魁祸首。
channel执行操作\channel状态channel为nilchannel buf为空channel buf已满channel buf未满且不为空channel已关闭
receive
接收操作阻塞阻塞读取数据读取数据返回buf中缓存的数据
send
发送操作阻塞写入数据阻塞写入数据panic
close
关闭panic关闭channel,buf中没有缓存数据关闭channel,保留已缓存的数据关闭channel,保留已缓存的数据panic
又出成绩啦
我们又出成绩啦!大厂Offer集锦!遥遥领先!
这些朋友赢麻了!
这是一个专注程序员升职加薪の知识星球
答疑解惑
需要「简历优化」、「就业辅导」、「职业规划」的朋友可以联系我。
加我微信:wangzhongyang1993
关注我的同名公众号:王中阳Go
版权归原作者 王中阳Go 所有, 如有侵权,请联系我们删除。