csp模型介绍
csp
模型是golang采用的共享内存模型,比对传统多线程共享内存采用lock、condition等方式来规定执行顺序的方式,golang里的csp更强调信道(channel),不关心信道发送、接收方是谁,双方通过信道收发信息。
ps:和
csp
模型对应的
actor
模型,强调通信中的角色(actor),即收发双方,不看重通道。并且角色对外不提供任何接口访问,只约定通过通信异步交换信息,发送方必须知道接收方是谁,例如一个人要给另个人传递消息通过给对方的邮箱写信或者发短信。
channel
golang的channel实现了多线程并发安全的发送和接收方法,可以轻松实现多协程的生产消费模型。
源码解析(go1.19.3:src/runtime/chan.go)
数据结构定义
// 位于源码src/runtime/chan.gotype hchan struct{
qcount uint// total data in the queue
dataqsiz uint// size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint// send index
recvx uint// receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters// lock protects all fields in hchan, as well as several// fields in sudogs blocked on this channel.//// Do not change another G's status while holding this lock// (in particular, do not ready a G), as this can deadlock// with stack shrinking.
lock mutex
}
channel实现为一个固定大小的环形缓冲区,sendx、recvx表示缓冲区的头和尾,所以channel的收发可以一直复用这块缓冲区。
- qcount: 缓冲区当前元素个数
- dataqsiz: 缓冲区的容量
- buf: 缓冲区(初始分配的元素数组)
- elemsize: 元素的内存格式长度
- closed: channel是否已经关闭
- eletype: 元素的类型
- sendx: 环形缓冲区的已写入元素最大索引
- recvx: 环形缓冲区的已写入元素最小索引
- recvq: 阻塞接收数据的协程队列(队列不为空表示channel无数据)
- sendq: 阻塞发送数据的协程队列(队列不为空表示channel满)
- lock: 一把互斥大锁
初始化channel
funcmakechan(t *chantype, size int)*hchan {
elem := t.elem
// compiler checks this but be safe.if elem.size >=1<<16{throw("makechan: invalid channel element type")}if hchanSize%maxAlign !=0|| elem.align > maxAlign {throw("makechan: bad alignment")}
mem, overflow := math.MulUintptr(elem.size,uintptr(size))if overflow || mem > maxAlloc-hchanSize || size <0{panic(plainError("makechan: size out of range"))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.// buf points into the same allocation, elemtype is persistent.// SudoG's are referenced from their owning thread so they can't be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.var c *hchan
switch{case mem ==0:// Queue or element size is zero.
c =(*hchan)(mallocgc(hchanSize,nil,true))// Race detector uses this location for synchronization.
c.buf = c.raceaddr()case elem.ptrdata ==0:// Elements do not contain pointers.// Allocate hchan and buf in one call.
c =(*hchan)(mallocgc(hchanSize+mem,nil,true))
c.buf =add(unsafe.Pointer(c), hchanSize)default:// Elements contain pointers.
c =new(hchan)
c.buf =mallocgc(mem, elem,true)}
c.elemsize =uint16(elem.size)
c.elemtype = elem
c.dataqsiz =uint(size)lockInit(&c.lock, lockRankHchan)if debugChan {print("makechan: chan=", c,"; elemsize=", elem.size,"; dataqsiz=", size,"\n")}return c
}
1.校验channel元素类型大小不能超过64k、元素内存对齐系数不超过8字节;
2.校验缓冲区申请的长度是否合法;
3.根据channel类型分配不同的内存:阻塞通道(长度为0)就创建包含空buf的通道;非阻塞且结构体数据的通道(长度大于0,元素为struct{xxx})就创建hchan+长度*元素大小的内存;非阻塞且指针数据的通道(长度大于0,元素为ptr)就创建hchan和另一个长度大小的内存作为buf;
channel发送
funcchansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)bool{if c ==nil{if!block {returnfalse}gopark(nil,nil, waitReasonChanSendNilChan, traceEvGoStop,2)throw("unreachable")}if debugChan {print("chansend: chan=", c,"\n")}if raceenabled {racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))}if!block && c.closed ==0&&full(c){returnfalse}var t0 int64if blockprofilerate >0{
t0 =cputicks()}lock(&c.lock)if c.closed !=0{unlock(&c.lock)panic(plainError("send on closed channel"))}if sg := c.recvq.dequeue(); sg !=nil{// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep,func(){unlock(&c.lock)},3)returntrue}if c.qcount < c.dataqsiz {// Space is available in the channel buffer. Enqueue the element to send.
qp :=chanbuf(c, c.sendx)if raceenabled {racenotify(c, c.sendx,nil)}typedmemmove(c.elemtype, qp, ep)
c.sendx++if c.sendx == c.dataqsiz {
c.sendx =0}
c.qcount++unlock(&c.lock)returntrue}if!block {unlock(&c.lock)returnfalse}// Block on the channel. Some receiver will complete our operation for us.
gp :=getg()
mysg :=acquireSudog()
mysg.releasetime =0if t0 !=0{
mysg.releasetime =-1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink =nil
mysg.g = gp
mysg.isSelect =false
mysg.c = c
gp.waiting = mysg
gp.param =nil
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan,1)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend,2)KeepAlive(ep)// someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}
gp.waiting =nil
gp.activeStackChans =false
closed :=!mysg.success
gp.param =nilif mysg.releasetime >0{blockevent(mysg.releasetime-t0,2)}
mysg.c =nilreleaseSudog(mysg)if closed {if c.closed ==0{throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}returntrue}
1.如果是非阻塞通道且通道满了,忽略本次数据发送,直接返回false;
2.加锁锁住通道;
3.如果通道关闭,表示往关闭的通道发送数据,崩溃;
4.接收协程队列非空,表示通道无数据,接收协程们都在摸鱼等待数据,出队一个协程发送数据(数据拷贝给目标协程,并将目标协程放入当前线程m的p本地队列中等待调度),解锁通道,返回true;
5.通道有数据且没满,将数据移动到缓冲区末尾,解锁通道,返回true;
6.如果通道非阻塞的,经过以上步骤到这里表示通道满了,忽略本次数据发送,解锁通道,返回false;
7.经过以上步骤到这里表示通道满了且是阻塞通道,将当前协程的数据加入发送协程队列,挂起当前协程等待被接收协程唤醒;
8.出让线程m,让m去执行其它逻辑;
channel接收
funcchanrecv(c *hchan, ep unsafe.Pointer, block bool)(selected, received bool){// raceenabled: don't need to check ep, as it is always on the stack// or is new memory allocated by reflect.if debugChan {print("chanrecv: chan=", c,"\n")}if c ==nil{if!block {return}gopark(nil,nil, waitReasonChanReceiveNilChan, traceEvGoStop,2)throw("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.if!block &&empty(c){if atomic.Load(&c.closed)==0{return}ifempty(c){// The channel is irreversibly closed and empty.if raceenabled {raceacquire(c.raceaddr())}if ep !=nil{typedmemclr(c.elemtype, ep)}returntrue,false}}var t0 int64if blockprofilerate >0{
t0 =cputicks()}lock(&c.lock)if c.closed !=0{if c.qcount ==0{if raceenabled {raceacquire(c.raceaddr())}unlock(&c.lock)if ep !=nil{typedmemclr(c.elemtype, ep)}returntrue,false}// The channel has been closed, but the channel's buffer have data.}else{// Just found waiting sender with not closed.if sg := c.sendq.dequeue(); sg !=nil{recv(c, sg, ep,func(){unlock(&c.lock)},3)returntrue,true}}if c.qcount >0{// Receive directly from queue
qp :=chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx,nil)}if ep !=nil{typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)
c.recvx++if c.recvx == c.dataqsiz {
c.recvx =0}
c.qcount--unlock(&c.lock)returntrue,true}if!block {unlock(&c.lock)returnfalse,false}// no sender available: block on this channel.
gp :=getg()
mysg :=acquireSudog()
mysg.releasetime =0if t0 !=0{
mysg.releasetime =-1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink =nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect =false
mysg.c = c
gp.param =nil
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan,1)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv,2)// someone woke us upif mysg != gp.waiting {throw("G waiting list is corrupted")}
gp.waiting =nil
gp.activeStackChans =falseif mysg.releasetime >0{blockevent(mysg.releasetime-t0,2)}
success := mysg.success
gp.param =nil
mysg.c =nilreleaseSudog(mysg)returntrue, success
}
1.如果通道是非阻塞的且通道没有数据,返回false;
2.加锁锁住通道;
3.通道关闭了,解锁通道,返回false;
4.如果发送协程队列非空,表示阻塞通道上有协程卡在发送数据,执行当前协程接收数据(复制数据到当前协程,和发送时一样环形阻塞的发送协程),解锁通道,返回true;
5.否则经过以上且通道有数据,直接移动数据到当前协程,解锁通道,返回true;
6.挂起当前接收协程,等到发送协程唤醒;
7.让出执行线程m去执行别的逻辑;
channel关闭
funcclosechan(c *hchan){if c ==nil{panic(plainError("close of nil channel"))}lock(&c.lock)if c.closed !=0{unlock(&c.lock)panic(plainError("close of closed channel"))}if raceenabled {
callerpc :=getcallerpc()racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))racerelease(c.raceaddr())}
c.closed =1var glist gList
// release all readersfor{
sg := c.recvq.dequeue()if sg ==nil{break}if sg.elem !=nil{typedmemclr(c.elemtype, sg.elem)
sg.elem =nil}if sg.releasetime !=0{
sg.releasetime =cputicks()}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success =falseif raceenabled {raceacquireg(gp, c.raceaddr())}
glist.push(gp)}// release all writers (they will panic)for{
sg := c.sendq.dequeue()if sg ==nil{break}
sg.elem =nilif sg.releasetime !=0{
sg.releasetime =cputicks()}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success =falseif raceenabled {raceacquireg(gp, c.raceaddr())}
glist.push(gp)}unlock(&c.lock)// Ready all Gs now that we've dropped the channel lock.for!glist.empty(){
gp := glist.pop()
gp.schedlink =0goready(gp,3)}}
1.加锁锁住通道;
2.如果接收协程队列非空,说明通道为空,这些协程在摸鱼,给他们每个发送一个nil数据,并将每个协程加入临时协程队列记录;
3.如果发送协程队列非空,说明通道满,给他们每个发送一个nil数据,并将每个协程加入临时协程队列;
4.解锁通道;
5.判断临时协程队列非空,说明有需要唤醒的协程,遍历唤醒;
总结
channel的大体功能函数就分析完了,逻辑还是挺简单的,主要是利用环形缓冲区、lock的加速设计、协程休眠、唤醒机制。
不过还需要注意select {case: ; default:}语句块包裹下的收发:如果是发送或者接收逻辑包裹了select+default,会标记为无阻塞通道,收发不成功立即返回,不会休眠当前协程。
版权归原作者 lkness 所有, 如有侵权,请联系我们删除。