使用的go版本为 go1.21.2
首先我们写一个简单的chan调度代码
package main
import "fmt"
func main() {
ch := make(chan struct{})
go func() {
ch <- struct{}{}
ch <- struct{}{}
}()
fmt.Println("xiaochuan", <-ch)
data, ok := <-ch
fmt.Println("xiaochuan", data, ok)
close(ch)
}
因为ch的数据获取方式有两种,所以这个示例代码写了两次的ch读与写
老样子通过go build -gcflags -S main.go获取到对应的汇编代码
调度make最终被转换为CALL runtime.makechan
调度ch <- struct{}{}最终被转换为CALL runtime.chansend1 由于我们调度了两次所以这里有两个
调度 <-ch 最终被转换为CALL runtime.chanrecv1
我们还进行一次两个参数的调度接收ch读取
data, ok := <-ch最终被转换为CALL runtime.chanrecv2
调度 close(ch) 最终被转换为CALL runtime.closechan 先来看一下hchan构造体相关的底层源码
hchan结构体
//代码位于 GOROOT/src/runtime/chan.go L:33
type hchan struct {
qcount uint // 环形队列中元素个数
dataqsiz uint // 环形队列的大小
buf unsafe.Pointer // 指向大小为 dataqsiz 的数组
elemsize uint16 // 元素大小
closed uint32 // 是否关闭
elemtype *_type // 元素类型
sendx uint // 发送索引
recvx uint // 接收索引
recvq waitq // recv 等待列表,即( <-ch )
sendq waitq // send 等待列表,即( ch<- )
lock mutex // 锁
}
type waitq struct { // 等待队列 sudog 双向队列
first *sudog
last *sudog
}
type sudog struct {
// 下面的字段由 sudog 阻塞的 channel 的 hchan.lock 保护。
// shrinkstack 依赖这个字段来处理参与 channel 操作的 sudog。
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // 数据元素(可能指向堆栈)
// 下面的字段在任何情况下都不会并发访问。
// 对于 channels,waitlink 只有 g 访问。
// 对于 semaphores,所有字段(包括上面的字段)
// 仅在持有 semaRoot 锁时才会访问。
acquiretime int64
releasetime int64
ticket uint32
// isSelect 表示 g 参与了 select,因此 g.selectDone 必须进行 CAS 操作以赢得唤醒竞争。
isSelect bool
// success 表示通信是否成功。如果 goroutine 被唤醒是因为在通道 c 上传递了值,则为 true,
// 如果是因为 c 被关闭而唤醒,则为 false。
success bool
parent *sudog // semaRoot 二叉树
waitlink *sudog // g.waiting 列表或 semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
先从创建chan开始
makechan源码与解读
//代码位于 GOROOT/src/runtime/chan.go L:65
//如果我们make的初始化缓冲区比较大会调度这个函数
func makechan64(t *chantype, size int64) *hchan {
//将size强转为int类型
//因为go的int类型的大小在不同平台上可能是 32 位或 64 位
//如果大小超过了当前平台int最大值,会截断掉超出最大值的部分
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}
//强制转换为int类型超出int部分截断
return makechan(t, int(size))
}
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
//编辑器检测元素的大小会不会大于2的16次方,对齐方式
if elem.Size_ >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
throw("makechan: bad alignment")
}
//检测内存大小,会不会有溢出的情况
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
//初始化hchan
var c *hchan
switch {
case mem == 0: //队列或元素大小为零
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.PtrBytes == 0: //元素不包含指针(在调用中分配 hchan 和 buf)
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default: //元素包含指针
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
//填充元素大小、元素类型、数据环形队列的大小
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan { //开启debug开关,公屏打印
print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
}
return c
}
chansend1源码与解读
//代码位于 GOROOT/src/runtime/chan.go L:142
//c <- x 调度这个函数
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil { //判断当前ch是不是一个空指针,如果为空将当前G休眠,触发崩溃
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
if debugChan { //开启debug开关,公屏打印
print("chansend: chan=", c, "\n")
}
if raceenabled {//竞争开启
racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
}
//在无锁的情况下,检测一下是否ch 是否关闭,是否会造成阻塞
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock) //获取chan锁
if c.closed != 0 { // 二次确认chan是不是已经关闭
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
//判断当前ch是否存在接收方
//如果存在直接调用send函数将数据发送给对方,避免数据复制到缓存区中去
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
//判断当前ch元素个数是否小于队列的长度
//如果有剩余空间将数据将要发送的元素加入队列
if c.qcount < c.dataqsiz {
// 获取环形队列中的元素
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
// 直接ep复制给qp
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
gp := getg() //获取当前G
//获取一个sudog, 优先从P中获取
//如果P中的sudog缓存区(本地无锁)为空
//从调度器层的sudog缓冲区(全局需要加锁)中拿数据放入P的sudog缓存区
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
//将sudog写入send环形队列中去
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
//将当前G的parkingOnChan设置为true(表示目前停止在了chansend或chanrecv上)
//将当前的G移出调度队列(调度chanparkcommit解锁当前ch)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
//调度KeepAlive函数确保发送的元素处于一个可达的状态避免被回收
KeepAlive(ep)
//当前后续唤醒G
//判断G的等待列表是否为当前的sudog
//如果不一致说明G已经被改写了
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
//清空G的等待队列,
//获取当前被唤醒的原因sudog.succes
//因为唤醒方式有两种,1。通道关闭 2.接收唤起
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil //清空G的参数列表
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg) //释放sudog重新放回P的sudogcache(本地)
if closed { //由于不能写入关闭的chan,所以直接异常了
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
直接发送的时候调用的send函数解读如下
send源码与解读
//代码位于 GOROOT/src/runtime/chan.go L:295
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
// 检测数据是否为空
// 如果不为空直接调用sendDirect函数发送数据,然后将其重置为nil
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
//获取等待列表中的G,
//将当前的ch解锁, sugo赋值为G当做启动参数
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
//sugo判断释放时间是否为0
//为0将其设置为当前 CPU 的时钟滴答数
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
//将G标记为可运行状态,放入调度队列等待被后续调度
goready(gp, skip+1)
}
chanrecv1与chanrecv2源码与解读
//代码位于 GOROOT/src/runtime/chan.go L:442
//chanrecv1与chanrecv2的处理逻辑基本差不多
//chanrecv2多接受了一个变量而已
//可以理解为这样ok := chanrecv2(ch, v)
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if debugChan {//开启debug开关,公屏打印
print("chanrecv: chan=", c, "\n")
}
if c == nil {//判断当前ch是不是为空指针,如果为空将当前G休眠,触发崩溃
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
if !block && empty(c) {//非阻塞情况下, 且数据队列为空
if atomic.Load(&c.closed) == 0 { //原子读取 当前ch是否关闭,如果关闭直接返回
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
return
}
if empty(c) {// 重新检测是否为空ch
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock) //获取chan锁
if c.closed != 0 { // 二次确认ch是不是已经关闭
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
// 判断当前ch是否存在发送方
// 如果存在直接调用recv函数将数据接受对方的数据
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
//环形队列中存在数据,直接从队列中接收,传递给接受者
if c.qcount > 0 {
// 获取环形队列中的元素
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
// 直接qp复制给ep
typedmemmove(c.elemtype, ep, qp)
}
//清除数据
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
gp := getg()//获取当前G
//获取一个sudog, 优先从P中获取
//如果P中的sudog缓存区(本地无锁)为空
//从调度器层的sudog缓冲区(全局需要加锁)中拿数据放入P的sudog缓存区
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
//将sudog写入recvq环形队列中去
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
//将当前G的parkingOnChan设置为true(表示目前停止在了chansend或chanrecv上)
//将当前的G移出调度队列(调度chanparkcommit解锁当前ch)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
//当前后续唤醒G
//判断G的等待列表是否为当前的sudog
//如果不一致说明G已经被改写了
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
//清空G的等待队列,
//获取当前被唤醒的原因sudog.succes
//因为唤醒方式有两种,1。通道关闭 2.发送唤起
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)//释放sudog重新放回P的sudogcache(本地)
return true, success
}
直接读取的时候调用的recv函数解读如下
recv源码与解读
//代码位于 GOROOT/src/runtime/chan.go L:616
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//判断当前环形队列是否为0
//为0从发送方复制数据(调度recvDirect函数)
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// 获取环形队列中的元素
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// 如果数据不为空 直接ep复制给qp
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清除数据
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
//获取等待列表中的G,
//将当前的ch解锁, sugo赋值为G当做启动参数
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
//sugo判断释放时间是否为0
//为0将其设置为当前 CPU 的时钟滴答数
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
//将G标记为可运行状态,放入调度队列等待被后续调度
goready(gp, skip+1)
}
closechan源码与解读
//代码位于 GOROOT/src/runtime/chan.go L:358
func closechan(c *hchan) {
if c == nil {//如果ch未初始化直接报错
panic(plainError("close of nil channel"))
}
lock(&c.lock) //获取chan锁
if c.closed != 0 { //如果当前ch已经处于关闭状态,触发异常
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled { //竞争开启
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
racerelease(c.raceaddr())
}
c.closed = 1 //将当前ch设置为关闭状态
//待唤醒的G列表
var glist gList
// release all readers
for {
//逐步从读取队列取值,直到获取完为止
sg := c.recvq.dequeue()
if sg == nil {
break
}
//数据不为空,释放掉对应的内存块
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
// 重置释放时间
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 获取对应的G, 重置唤醒参数
// 将这个G加入到glist中等待后续唤醒
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
for {
//逐步从发送队列取值,直到获取完为止 (向关闭的ch发送数据会有panic)
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
// 重置释放时间
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 获取对应的G, 重置唤醒参数
// 将这个G加入到glist中等待后续唤醒
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// 循环glist待唤醒列表将G设置为read状态(唤醒G运行干活)
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
总结
我们从上面的源码分析了解chan的数据结构、发送数据、接收数据和关闭这些基本操作,从源码分析我们得知chan的读写操作是会上锁的,如果业务中对性能要求比较高的情况下chan的这把锁会成为我们系统内的瓶颈。
版权归原作者 一名路过的小码农 所有, 如有侵权,请联系我们删除。