轻松掌握Go通道:一篇文章就够了

发表时间: 2021-05-09 12:10

1.简介

channel是Go语言的一大特性,基于channel有很多值得探讨的问题,如

  1. channel为什么是并发安全的?
  2. 同步通道和异步通道有啥区别?
  3. 通道为何会阻塞协程?
  4. 使用通道导致阻塞的协程是如何解除阻塞的?

要了解本质,需要进源码查看,毕竟源码之下了无秘密。

2.原理

2.1创建

channel理论上有三种,带缓冲\不带缓冲\nil,写法如下:

// bufferedch := make(chan Task, 3)// unbufferedch := make(chan int)// nilvar ch chan int复制代码

追踪make函数,会发现在builtin/builtin.go中仅有一个声明func make(t Type, size ...IntegerType) Type。真正的实现可以参考go内置函数make,简单来说在
cmd/compile/internal/gc/typecheck.go中有函数typecheck1

// The result of typecheck1 MUST be assigned back to n, e.g.// 	n.Left = typecheck1(n.Left, top)func typecheck1(n *Node, top int) (res *Node) {	if enableTrace && trace {		defer tracePrint("typecheck1", n)(&res)	}	switch n.Op {	case OMAKE:		ok |= ctxExpr		args := n.List.Slice()		if len(args) == 0 {			yyerror("missing argument to make")			n.Type = nil			return n		}		n.List.Set(nil)		l := args[0]		l = typecheck(l, Etype)		t := l.Type		if t == nil {			n.Type = nil			return n		}		i := 1		switch t.Etype {		default:			yyerror("cannot make type %v", t)			n.Type = nil			return n		case TCHAN:			l = nil			if i < len(args) {				l = args[i]				i++				l = typecheck(l, ctxExpr)				l = defaultlit(l, types.Types[TINT])				if l.Type == nil {					n.Type = nil					return n				}				if !checkmake(t, "buffer", l) {					n.Type = nil					return n				}				n.Left = l			} else {				n.Left = nodintconst(0)			}			n.Op = OMAKECHAN //对应的函数位置		}		if i < len(args) {			yyerror("too many arguments to make(%v)", t)			n.Op = OMAKE			n.Type = nil			return n		}		n.Type = t		if (top&ctxStmt != 0) && top&(ctxCallee|ctxExpr|Etype) == 0 && ok&ctxStmt == 0 {			if !n.Diag() {				yyerror("%v evaluated but not used", n)				n.SetDiag(true)			}			n.Type = nil			return n		}		return n	}}复制代码

最终真正实现位置为runtime/chan.go

func makechan(t *chantype, size int) *hchan {   elem := t.elem   // compiler checks this but be safe.   if elem.size >= 1<<16 {      throw("makechan: invalid channel element type")   }   if hchanSize%maxAlign != 0 || elem.align > maxAlign {      throw("makechan: bad alignment")   }   mem, overflow := math.MulUintptr(elem.size, uintptr(size))   if overflow || mem > maxAlloc-hchanSize || size < 0 {      panic(plainError("makechan: size out of range"))   }   // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.   // buf points into the same allocation, elemtype is persistent.   // SudoG's are referenced from their owning thread so they can't be collected.   // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.   var c *hchan   switch {   case mem == 0:      // Queue or element size is zero.      c = (*hchan)(mallocgc(hchanSize, nil, true))      // Race detector uses this location for synchronization.      c.buf = c.raceaddr()   case elem.kind&kindNoPointers != 0:      // Elements do not contain pointers.      // Allocate hchan and buf in one call.      c = (*hchan)(mallocgc(hchanSize+mem, nil, true))      c.buf = add(unsafe.Pointer(c), hchanSize)   default:      // Elements contain pointers.      c = new(hchan)      c.buf = mallocgc(mem, elem, true)   }   c.elemsize = uint16(elem.size)   c.elemtype = elem   c.dataqsiz = uint(size)   if debugChan {      print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")   }   return c}复制代码

从这个函数可以看出,channel的数据结构为hchan

2.2结构

接下来我们看一下channel的数据结构,基于数据结构,可以推测出具体实现。

runtime/chan.go

type hchan struct {	//channel队列里面总的数据量	qcount   uint           // total data in the queue	// 循环队列的容量,如果是非缓冲的channel就是0	dataqsiz uint           // size of the circular queue	// 缓冲队列,数组类型。	buf      unsafe.Pointer // points to an array of dataqsiz elements	// 元素占用字节的size	elemsize uint16	// 当前队列关闭标志位,非零表示关闭	closed   uint32	// 队列里面元素类型	elemtype *_type // element type	// 队列send索引	sendx    uint   // send index	// 队列索引	recvx    uint   // receive index	// 等待channel的G队列。	recvq    waitq  // list of recv waiters	// 向channel发送数据的G队列。	sendq    waitq  // list of send waiters	// lock protects all fields in hchan, as well as several	// fields in sudogs blocked on this channel.	//	// Do not change another G's status while holding this lock	// (in particular, do not ready a G), as this can deadlock	// with stack shrinking.	// 全局锁	lock mutex}复制代码

通过该hchan的数据结构和makechan函数,数据结构里有几个值得说明的数据:

  1. dataqsiz表示channel的长度,如果为非缓冲队列,则值为0。通过dataqsiz实现环形队列。
  2. buf存放真正的数据
  3. sendx和recvx指在环形队列中数据入channel和出channel的位置
  4. sendq存放向channel发送数据的goroutine队列
  5. recvq存放等待获取channel数据的goroutine队列
  6. lock为全局锁

2.3Anwser

通过追查到的代码,我们可以回答最开始提出的几个问题了。

2.3.1channel为什么是并发安全的?

因为做操作之前,都会先获取全局锁,只有获取成功的才能进行操作,保证了并发安全。

2.3.2同步通道和异步通道有啥区别?

使用的底层数据结构、操作代码都是一样的,只不过dataqsiz的值不一样,一个为0,一个为正数。

2.3.3通道为何会阻塞协程?

当通道已经满了,但协程继续往通道里写入,或者通道里没有数据,但是协程从通道里获取数据时,协程会被阻塞。

实现的原理与Golang并发调度的GMP模型强相关。

写入满通道的流程

  1. 当前goroutine(G1)创建自身的一个引用(sudog),放置到hchan的sendq队列
  2. 当前goroutine(G1)会调用gopark函数,将当前协程置为waiting状态;
  3. 将M和G1绑定关系断开;
  4. scheduler会调度另外一个就绪态的goroutine与M建立绑定关系,然后M 会运行另外一个G。

读取空通道的流程

  1. 当前goroutine(G2)会创建自身的一个引用(sudog)
  2. 将代表G2的sudog存入recvq等待队列
  3. G2会调用gopark函数进入等待状态,让出OS thread,然后G2进入阻塞态

2.3.4使用通道导致阻塞的协程是如何解除阻塞的?

对于已经满的通道,当有协程G2做读操作时,会解除G1的阻塞,流程为

  1. G2调用 t:=<-ch 获取一个元素A;
  2. 从hchan的buf里面取出一个元素;
  3. 从sendq等待队列里面pop一个sudog;
  4. 将G1要写入的数据复制到buf中A的位置,然后更新buf的sendx和recvx索引值;
  5. G2调用goready(G1)将G1置为Runable状态,表示G1可以恢复运行;

对于读取空的通道,当有协程G1做写操作时,会解除G2的阻塞,流程为

  1. 将待写入的消息发送给接收的goroutine G2;
  2. G1调用goready(G2) 将G2设置成就绪状态,等待调度;

2.4实现

我们来看一下chan的具体实现

2.4.1读取数据

// chanrecv receives on channel c and writes the received data to ep.// ep may be nil, in which case received data is ignored.// If block == false and no elements are available, returns (false, false).// Otherwise, if c is closed, zeros *ep and returns (true, false).// Otherwise, fills in *ep with an element and returns (true, true).// A non-nil ep must point to the heap or the caller's stack.func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {   // raceenabled: don't need to check ep, as it is always on the stack   // or is new memory allocated by reflect.   if debugChan {      print("chanrecv: chan=", c, "\n")   }   if c == nil {      if !block {         return      }      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)      throw("unreachable")   }   // Fast path: check for failed non-blocking operation without acquiring the lock.   //   // After observing that the channel is not ready for receiving, we observe that the   // channel is not closed. Each of these observations is a single word-sized read   // (first c.sendq.first or c.qcount, and second c.closed).   // Because a channel cannot be reopened, the later observation of the channel   // being not closed implies that it was also not closed at the moment of the   // first observation. We behave as if we observed the channel at that moment   // and report that the receive cannot proceed.   //   // The order of operations is important here: reversing the operations can lead to   // incorrect behavior when racing with a close.   if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||      c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&      atomic.Load(&c.closed) == 0 {      return   }   var t0 int64   if blockprofilerate > 0 {      t0 = cputicks()   }   lock(&c.lock)   if c.closed != 0 && c.qcount == 0 {      if raceenabled {         raceacquire(c.raceaddr())      }      unlock(&c.lock)      if ep != nil {         typedmemclr(c.elemtype, ep)      }      return true, false   }   if sg := c.sendq.dequeue(); sg != nil {      // Found a waiting sender. If buffer is size 0, receive value      // directly from sender. Otherwise, receive from head of queue      // and add sender's value to the tail of the queue (both map to      // the same buffer slot because the queue is full).      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)      return true, true   }   if c.qcount > 0 {      // Receive directly from queue      qp := chanbuf(c, c.recvx)      if raceenabled {         raceacquire(qp)         racerelease(qp)      }      if ep != nil {         typedmemmove(c.elemtype, ep, qp)      }      typedmemclr(c.elemtype, qp)      c.recvx++      if c.recvx == c.dataqsiz {         c.recvx = 0      }      c.qcount--      unlock(&c.lock)      return true, true   }   if !block {      unlock(&c.lock)      return false, false   }   // no sender available: block on this channel.   gp := getg()   mysg := acquireSudog()   mysg.releasetime = 0   if t0 != 0 {      mysg.releasetime = -1   }   // No stack splits between assigning elem and enqueuing mysg   // on gp.waiting where copystack can find it.   mysg.elem = ep   mysg.waitlink = nil   gp.waiting = mysg   mysg.g = gp   mysg.isSelect = false   mysg.c = c   gp.param = nil   c.recvq.enqueue(mysg)   goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)   // someone woke us up   if mysg != gp.waiting {      throw("G waiting list is corrupted")   }   gp.waiting = nil   if mysg.releasetime > 0 {      blockevent(mysg.releasetime-t0, 2)   }   closed := gp.param == nil   gp.param = nil   mysg.c = nil   releaseSudog(mysg)   return true, !closed}复制代码

接收channel的数据的流程如下:

CASE1:前置channel为nil的场景:

如果block为非阻塞,直接return;

如果block为阻塞,就调用gopark()阻塞当前goroutine,并抛出异常。

  • 前置场景,block为非阻塞,且channel为非缓冲队列且sender等待队列为空 或者 channel为有缓冲队列但是队列里面元素数量为0,且channel未关闭,这个时候直接return;
  • 调用 lock(&c.lock) 锁住channel的全局锁;

CASE2:channel已经被关闭且channel缓冲中没有数据了,这时直接返回success和空值;

CASE3:sender队列非空,调用 func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int)

函数处理:

1.先取channel缓冲队列的对头元素复制给receiver(也就是ep);

2.将sender队列的对头元素里面的数据复制到channel缓冲队列刚刚弹出的元素的位置,这样缓冲队列就不用移动数据了。

channel是非缓冲channel,直接调用recvDirect函数直接从sender recv元素到ep对象,这样就只用复制一次;

对于sender队列非空情况下, 有缓冲的channel的缓冲队列一定是满的:

释放channel的全局锁;

调用goready函数标记当前goroutine处于ready,可以运行的状态;

CASE4:sender队列为空,缓冲队列非空,直接取队列元素,移动头索引;

CASE5:sender队列为空、缓冲队列也没有元素且不阻塞协程,直接return (false,false);

CASE6:sender队列为空且channel的缓存队列为空,将goroutine加入recv队列,并阻塞。

2.4.2写入数据

/* * generic single channel send/recv * If block is not nil, * then the protocol will not * sleep but return if it could * not complete. * * sleep can wake up with g.param == nil * when a channel involved in the sleep has * been closed.  it is easiest to loop and re-run * the operation; we'll see that it's now closed. */func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {   if c == nil {      if !block {         return false      }      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)      throw("unreachable")   }   if debugChan {      print("chansend: chan=", c, "\n")   }   if raceenabled {      racereadpc(c.raceaddr(), callerpc, funcPC(chansend))   }   // Fast path: check for failed non-blocking operation without acquiring the lock.   //   // After observing that the channel is not closed, we observe that the channel is   // not ready for sending. Each of these observations is a single word-sized read   // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).   // Because a closed channel cannot transition from 'ready for sending' to   // 'not ready for sending', even if the channel is closed between the two observations,   // they imply a moment between the two when the channel was both not yet closed   // and not ready for sending. We behave as if we observed the channel at that moment,   // and report that the send cannot proceed.   //   // It is okay if the reads are reordered here: if we observe that the channel is not   // ready for sending and then observe that it is not closed, that implies that the   // channel wasn't closed during the first observation.   if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||      (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {      return false   }   var t0 int64   if blockprofilerate > 0 {      t0 = cputicks()   }   lock(&c.lock)   if c.closed != 0 {      unlock(&c.lock)      panic(plainError("send on closed channel"))   }   if sg := c.recvq.dequeue(); sg != nil {      // Found a waiting receiver. We pass the value we want to send      // directly to the receiver, bypassing the channel buffer (if any).      send(c, sg, ep, func() { unlock(&c.lock) }, 3)      return true   }   if c.qcount < c.dataqsiz {      // Space is available in the channel buffer. Enqueue the element to send.      qp := chanbuf(c, c.sendx)      if raceenabled {         raceacquire(qp)         racerelease(qp)      }      typedmemmove(c.elemtype, qp, ep)      c.sendx++      if c.sendx == c.dataqsiz {         c.sendx = 0      }      c.qcount++      unlock(&c.lock)      return true   }   if !block {      unlock(&c.lock)      return false   }   // Block on the channel. Some receiver will complete our operation for us.   gp := getg()   mysg := acquireSudog()   mysg.releasetime = 0   if t0 != 0 {      mysg.releasetime = -1   }   // No stack splits between assigning elem and enqueuing mysg   // on gp.waiting where copystack can find it.   mysg.elem = ep   mysg.waitlink = nil   mysg.g = gp   mysg.isSelect = false   mysg.c = c   gp.waiting = mysg   gp.param = nil   c.sendq.enqueue(mysg)   goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)   // Ensure the value being sent is kept alive until the   // receiver copies it out. The sudog has a pointer to the   // stack object, but sudogs aren't considered as roots of the   // stack tracer.   KeepAlive(ep)   // someone woke us up.   if mysg != gp.waiting {      throw("G waiting list is corrupted")   }   gp.waiting = nil   if gp.param == nil {      if c.closed == 0 {         throw("chansend: spurious wakeup")      }      panic(plainError("send on closed channel"))   }   gp.param = nil   if mysg.releasetime > 0 {      blockevent(mysg.releasetime-t0, 2)   }   mysg.c = nil   releaseSudog(mysg)   return true}复制代码

向channel写入数据主要流程如下:

  • CASE1:当channel为空或者未初始化,如果block表示阻塞那么向其中发送数据将会永久阻塞;如果block表示非阻塞就会直接return;
  • CASE2:前置场景,block为非阻塞,且channel没有关闭(已关闭的channel不能写入数据)且(channel为非缓冲队列且receiver等待队列为空)或则( channel为有缓冲队列但是队列已满),这个时候直接return;
  • 调用 lock(&c.lock) 锁住channel的全局锁;
  • CASE3:不能向已经关闭的channel send数据,会导致panic。
  • CASE4:如果channel上的recv队列非空,则跳过channel的缓存队列,直接向消息发送给接收的goroutine:
    • 调用sendDirect方法,将待写入的消息发送给接收的goroutine;
    • 释放channel的全局锁;
    • 调用goready函数,将接收消息的goroutine设置成就绪状态,等待调度。
  • CASE5:缓存队列未满,则将消息复制到缓存队列上,然后释放全局锁;
  • CASE6:缓存队列已满且接收消息队列recv为空,则将当前的goroutine加入到send队列;
    • 获取当前goroutine的sudog,然后入channel的send队列;
    • 将当前goroutine休眠

关闭channel

func closechan(c *hchan) {   if c == nil {      panic(plainError("close of nil channel"))   }   lock(&c.lock)   if c.closed != 0 {      unlock(&c.lock)      panic(plainError("close of closed channel"))   }   if raceenabled {      callerpc := getcallerpc()      racewritepc(c.raceaddr(), callerpc, funcPC(closechan))      racerelease(c.raceaddr())   }   c.closed = 1   var glist gList   // release all readers   for {      sg := c.recvq.dequeue()      if sg == nil {         break      }      if sg.elem != nil {         typedmemclr(c.elemtype, sg.elem)         sg.elem = nil      }      if sg.releasetime != 0 {         sg.releasetime = cputicks()      }      gp := sg.g      gp.param = nil      if raceenabled {         raceacquireg(gp, c.raceaddr())      }      glist.push(gp)   }   // release all writers (they will panic)   for {      sg := c.sendq.dequeue()      if sg == nil {         break      }      sg.elem = nil      if sg.releasetime != 0 {         sg.releasetime = cputicks()      }      gp := sg.g      gp.param = nil      if raceenabled {         raceacquireg(gp, c.raceaddr())      }      glist.push(gp)   }   unlock(&c.lock)   // Ready all Gs now that we've dropped the channel lock.   for !glist.empty() {      gp := glist.pop()      gp.schedlink = 0      goready(gp, 3)   }}复制代码

关闭的主要流程如下所示:

  • 获取全局锁;
  • 设置channel数据结构chan的关闭标志位;
  • 获取当前channel上面的读goroutine并链接成链表;
  • 获取当前channel上面的写goroutine然后拼接到前面的读链表后面;
  • 释放全局锁;
  • 唤醒所有的读写goroutine。

总结

了解一下具体实现还是很好的,虽然在使用上不会带来变化,不过理解了内涵后,能够更加灵活地使用通道,可以更加容易的追查到问题,也能学习到高手的设计思想。

资料

  1. Golang-Channel原理解析
  2. golang对于 nil通道 close通道你所不知道的神器特性
  3. Go语言make和new关键字的区别及实现原理
  4. Go底层引用实现
  5. 图解Golang的channel底层原理
  6. go内置函数make
  7. Golang并发调度的GMP模型

最后

大家如果喜欢我的文章,可以关注我的公众号(程序员麻辣烫)

我的个人博客为:shidawuhen.github.io/

往期文章回顾:

技术

  1. Go通道实现原理
  2. Go定时器实现原理
  3. HTTPS连接过程
  4. 限流实现2
  5. 秒杀系统
  6. 分布式系统与一致性协议
  7. 微服务之服务框架和注册中心
  8. Beego框架使用
  9. 浅谈微服务
  10. TCP性能优化
  11. 限流实现1
  12. Redis实现分布式锁
  13. Golang源码BUG追查
  14. 事务原子性、一致性、持久性的实现原理
  15. CDN请求过程详解
  16. 常用缓存技巧
  17. 如何高效对接第三方支付
  18. Gin框架简洁版
  19. InnoDB锁与事务简析
  20. 算法总结

读书笔记

  1. 敏捷革命
  2. 如何锻炼自己的记忆力
  3. 简单的逻辑学-读后感
  4. 热风-读后感
  5. 论语-读后感
  6. 孙子兵法-读后感

思考

  1. 项目流程管理
  2. 对项目管理的一些看法
  3. 对产品经理的一些思考
  4. 关于程序员职业发展的思考
  5. 关于代码review的思考
  6. Markdown编辑器推荐-typora