0


channel源码解析

csp模型介绍

csp

模型是golang采用的共享内存模型,比对传统多线程共享内存采用lock、condition等方式来规定执行顺序的方式,golang里的csp更强调信道(channel),不关心信道发送、接收方是谁,双方通过信道收发信息。

ps:

csp

模型对应的

actor

模型,强调通信中的角色(actor),即收发双方,不看重通道。并且角色对外不提供任何接口访问,只约定通过通信异步交换信息,发送方必须知道接收方是谁,例如一个人要给另个人传递消息通过给对方的邮箱写信或者发短信。

channel

golang的channel实现了多线程并发安全的发送和接收方法,可以轻松实现多协程的生产消费模型。

源码解析(go1.19.3:src/runtime/chan.go)

数据结构定义

// 位于源码src/runtime/chan.gotype hchan struct{
    qcount   uint// total data in the queue
    dataqsiz uint// size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint// send index
    recvx    uint// receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters// lock protects all fields in hchan, as well as several// fields in sudogs blocked on this channel.//// Do not change another G's status while holding this lock// (in particular, do not ready a G), as this can deadlock// with stack shrinking.
    lock mutex
}

channel实现为一个固定大小的环形缓冲区,sendx、recvx表示缓冲区的头和尾,所以channel的收发可以一直复用这块缓冲区。

  • qcount: 缓冲区当前元素个数
  • dataqsiz: 缓冲区的容量
  • buf: 缓冲区(初始分配的元素数组)
  • elemsize: 元素的内存格式长度
  • closed: channel是否已经关闭
  • eletype: 元素的类型
  • sendx: 环形缓冲区的已写入元素最大索引
  • recvx: 环形缓冲区的已写入元素最小索引
  • recvq: 阻塞接收数据的协程队列(队列不为空表示channel无数据)
  • sendq: 阻塞发送数据的协程队列(队列不为空表示channel满)
  • lock: 一把互斥大锁

初始化channel

funcmakechan(t *chantype, size int)*hchan {
    elem := t.elem

    // compiler checks this but be safe.if elem.size >=1<<16{throw("makechan: invalid channel element type")}if hchanSize%maxAlign !=0|| elem.align > maxAlign {throw("makechan: bad alignment")}

    mem, overflow := math.MulUintptr(elem.size,uintptr(size))if overflow || mem > maxAlloc-hchanSize || size <0{panic(plainError("makechan: size out of range"))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.// buf points into the same allocation, elemtype is persistent.// SudoG's are referenced from their owning thread so they can't be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.var c *hchan
    switch{case mem ==0:// Queue or element size is zero.
        c =(*hchan)(mallocgc(hchanSize,nil,true))// Race detector uses this location for synchronization.
        c.buf = c.raceaddr()case elem.ptrdata ==0:// Elements do not contain pointers.// Allocate hchan and buf in one call.
        c =(*hchan)(mallocgc(hchanSize+mem,nil,true))
        c.buf =add(unsafe.Pointer(c), hchanSize)default:// Elements contain pointers.
        c =new(hchan)
        c.buf =mallocgc(mem, elem,true)}

    c.elemsize =uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz =uint(size)lockInit(&c.lock, lockRankHchan)if debugChan {print("makechan: chan=", c,"; elemsize=", elem.size,"; dataqsiz=", size,"\n")}return c
}

1.校验channel元素类型大小不能超过64k、元素内存对齐系数不超过8字节;
2.校验缓冲区申请的长度是否合法;
3.根据channel类型分配不同的内存:阻塞通道(长度为0)就创建包含空buf的通道;非阻塞且结构体数据的通道(长度大于0,元素为struct{xxx})就创建hchan+长度*元素大小的内存;非阻塞且指针数据的通道(长度大于0,元素为ptr)就创建hchan和另一个长度大小的内存作为buf;

channel发送

funcchansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)bool{if c ==nil{if!block {returnfalse}gopark(nil,nil, waitReasonChanSendNilChan, traceEvGoStop,2)throw("unreachable")}if debugChan {print("chansend: chan=", c,"\n")}if raceenabled {racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))}if!block && c.closed ==0&&full(c){returnfalse}var t0 int64if blockprofilerate >0{
       t0 =cputicks()}lock(&c.lock)if c.closed !=0{unlock(&c.lock)panic(plainError("send on closed channel"))}if sg := c.recvq.dequeue(); sg !=nil{// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep,func(){unlock(&c.lock)},3)returntrue}if c.qcount < c.dataqsiz {// Space is available in the channel buffer. Enqueue the element to send.
       qp :=chanbuf(c, c.sendx)if raceenabled {racenotify(c, c.sendx,nil)}typedmemmove(c.elemtype, qp, ep)
       c.sendx++if c.sendx == c.dataqsiz {
           c.sendx =0}
       c.qcount++unlock(&c.lock)returntrue}if!block {unlock(&c.lock)returnfalse}// Block on the channel. Some receiver will complete our operation for us.
   gp :=getg()
   mysg :=acquireSudog()
   mysg.releasetime =0if t0 !=0{
       mysg.releasetime =-1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.
   mysg.elem = ep
   mysg.waitlink =nil
   mysg.g = gp
   mysg.isSelect =false
   mysg.c = c
   gp.waiting = mysg
   gp.param =nil
   c.sendq.enqueue(mysg)
   atomic.Store8(&gp.parkingOnChan,1)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend,2)KeepAlive(ep)// someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}
   gp.waiting =nil
   gp.activeStackChans =false
   closed :=!mysg.success
   gp.param =nilif mysg.releasetime >0{blockevent(mysg.releasetime-t0,2)}
   mysg.c =nilreleaseSudog(mysg)if closed {if c.closed ==0{throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}returntrue}

1.如果是非阻塞通道且通道满了,忽略本次数据发送,直接返回false;
2.加锁锁住通道;
3.如果通道关闭,表示往关闭的通道发送数据,崩溃;
4.接收协程队列非空,表示通道无数据,接收协程们都在摸鱼等待数据,出队一个协程发送数据(数据拷贝给目标协程,并将目标协程放入当前线程m的p本地队列中等待调度),解锁通道,返回true;
5.通道有数据且没满,将数据移动到缓冲区末尾,解锁通道,返回true;
6.如果通道非阻塞的,经过以上步骤到这里表示通道满了,忽略本次数据发送,解锁通道,返回false;
7.经过以上步骤到这里表示通道满了且是阻塞通道,将当前协程的数据加入发送协程队列,挂起当前协程等待被接收协程唤醒;
8.出让线程m,让m去执行其它逻辑;

channel接收

funcchanrecv(c *hchan, ep unsafe.Pointer, block bool)(selected, received bool){// raceenabled: don't need to check ep, as it is always on the stack// or is new memory allocated by reflect.if debugChan {print("chanrecv: chan=", c,"\n")}if c ==nil{if!block {return}gopark(nil,nil, waitReasonChanReceiveNilChan, traceEvGoStop,2)throw("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.if!block &&empty(c){if atomic.Load(&c.closed)==0{return}ifempty(c){// The channel is irreversibly closed and empty.if raceenabled {raceacquire(c.raceaddr())}if ep !=nil{typedmemclr(c.elemtype, ep)}returntrue,false}}var t0 int64if blockprofilerate >0{
        t0 =cputicks()}lock(&c.lock)if c.closed !=0{if c.qcount ==0{if raceenabled {raceacquire(c.raceaddr())}unlock(&c.lock)if ep !=nil{typedmemclr(c.elemtype, ep)}returntrue,false}// The channel has been closed, but the channel's buffer have data.}else{// Just found waiting sender with not closed.if sg := c.sendq.dequeue(); sg !=nil{recv(c, sg, ep,func(){unlock(&c.lock)},3)returntrue,true}}if c.qcount >0{// Receive directly from queue
        qp :=chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx,nil)}if ep !=nil{typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)
        c.recvx++if c.recvx == c.dataqsiz {
            c.recvx =0}
        c.qcount--unlock(&c.lock)returntrue,true}if!block {unlock(&c.lock)returnfalse,false}// no sender available: block on this channel.
    gp :=getg()
    mysg :=acquireSudog()
    mysg.releasetime =0if t0 !=0{
        mysg.releasetime =-1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink =nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect =false
    mysg.c = c
    gp.param =nil
    c.recvq.enqueue(mysg)
    atomic.Store8(&gp.parkingOnChan,1)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv,2)// someone woke us upif mysg != gp.waiting {throw("G waiting list is corrupted")}
    gp.waiting =nil
    gp.activeStackChans =falseif mysg.releasetime >0{blockevent(mysg.releasetime-t0,2)}
    success := mysg.success
    gp.param =nil
    mysg.c =nilreleaseSudog(mysg)returntrue, success
}

1.如果通道是非阻塞的且通道没有数据,返回false;
2.加锁锁住通道;
3.通道关闭了,解锁通道,返回false;
4.如果发送协程队列非空,表示阻塞通道上有协程卡在发送数据,执行当前协程接收数据(复制数据到当前协程,和发送时一样环形阻塞的发送协程),解锁通道,返回true;
5.否则经过以上且通道有数据,直接移动数据到当前协程,解锁通道,返回true;
6.挂起当前接收协程,等到发送协程唤醒;
7.让出执行线程m去执行别的逻辑;

channel关闭

funcclosechan(c *hchan){if c ==nil{panic(plainError("close of nil channel"))}lock(&c.lock)if c.closed !=0{unlock(&c.lock)panic(plainError("close of closed channel"))}if raceenabled {
        callerpc :=getcallerpc()racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))racerelease(c.raceaddr())}

    c.closed =1var glist gList

    // release all readersfor{
        sg := c.recvq.dequeue()if sg ==nil{break}if sg.elem !=nil{typedmemclr(c.elemtype, sg.elem)
            sg.elem =nil}if sg.releasetime !=0{
            sg.releasetime =cputicks()}
        gp := sg.g
        gp.param = unsafe.Pointer(sg)
        sg.success =falseif raceenabled {raceacquireg(gp, c.raceaddr())}
        glist.push(gp)}// release all writers (they will panic)for{
        sg := c.sendq.dequeue()if sg ==nil{break}
        sg.elem =nilif sg.releasetime !=0{
            sg.releasetime =cputicks()}
        gp := sg.g
        gp.param = unsafe.Pointer(sg)
        sg.success =falseif raceenabled {raceacquireg(gp, c.raceaddr())}
        glist.push(gp)}unlock(&c.lock)// Ready all Gs now that we've dropped the channel lock.for!glist.empty(){
        gp := glist.pop()
        gp.schedlink =0goready(gp,3)}}

1.加锁锁住通道;
2.如果接收协程队列非空,说明通道为空,这些协程在摸鱼,给他们每个发送一个nil数据,并将每个协程加入临时协程队列记录;
3.如果发送协程队列非空,说明通道满,给他们每个发送一个nil数据,并将每个协程加入临时协程队列;
4.解锁通道;
5.判断临时协程队列非空,说明有需要唤醒的协程,遍历唤醒;

总结

channel的大体功能函数就分析完了,逻辑还是挺简单的,主要是利用环形缓冲区、lock的加速设计、协程休眠、唤醒机制。
不过还需要注意select {case: ; default:}语句块包裹下的收发:如果是发送或者接收逻辑包裹了select+default,会标记为无阻塞通道,收发不成功立即返回,不会休眠当前协程。

标签: go channel

本文转载自: https://blog.csdn.net/u012785877/article/details/129832315
版权归原作者 lkness 所有, 如有侵权,请联系我们删除。

“channel源码解析”的评论:

还没有评论