Golang 的并发模型属于一种很典型的 CSP(communicating sequential processes) 并发模型,其核心是不要通过共享内存来通信,而应该通过通信来共享内存。具体实现,就是通过 goroutine 来实现并发,然后并发的 goroutine 之间通过 Channel 来进行通信;为此,Golang 的并发也有两个明显特点:
Golang 并发是通过 goroutine 协程来实现的,通过 go 关键字可以非常简单的启动一个协程;通过 go 关键字启动协程之后, 主进程并不会等待协程的执行,而是继续执行直至结束。因此,如果父子进程之间要有控制关系的话,就需要同步机制来保证。
Go 既然天然支持并发,并且可以很简单的实现并发编程,那么这些并发的协程之间,如果同时访问访问内存中的同一个数据,在没有同步的机制下,那么同一个数据的访问一定会出现错乱,因此,在并发的场景,一定要通过同步机制才能确保同一内存数据的正确访问。
当需要进行多批次的计算任务同步,或者需要一对多的协作流程的时候;通过 Context 的关联关系(go 的 context 被设计为包含了父子关系),我们就可以控制子协程的生命周期,而其他的同步方式是无法控制其生命周期的,只能是被动阻塞等待完成或者结束。context 控制子协程的生命周期,是通过 context 的 context.WithTimeout 机制来实现的,这个是一般系统中或者底层各种框架、库的普适用法。
context 对并发做一些控制包括 Context Done 取消、截止时间取消 context.WithDeadline、超时取消 context.WithTimeout 等。
一个简单的 context.WithTimeout 示例如下:
package main import ( "fmt" "sync" "time" "golang.org/x/net/context" ) var ( wg sync.WaitGroup ) func work(ctx context.Context) error { defer wg.Done() for i := 0; i < 1000; i++ { select { case <-time.After(2 * time.Second): fmt.Println("Do work ", i) // we received the signal of cancelation in this channel case <-ctx.Done(): fmt.Println("Cancel the context ", i) return ctx.Err() } } return nil } func main() { ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) defer cancel() fmt.Println("Hey, I'm going to do some work") wg.Add(1) go work(ctx) wg.Wait() fmt.Println("Finished. I'm done") }
sync mutex 包括互斥锁和读写锁
互斥锁是传统的并发程序对共享资源进行访问控制的主要手段,它由标准库代码包 sync 中的 Mutex 结构体类型代表。sync.Mutex 类型(确切地说,是*sync.Mutex类型)只有两个公开方法:Lock 和 Unlock。Lock 被用于锁定当前的互斥量,而 UnLock 则被用来对当前的互斥量进行解锁。
sync.Mutex 互斥锁使用的最佳实践
sync.RWMutex 读写锁即是针对于读写操作的互斥锁,它与普通的互斥锁最大的不同就是,它可以分别针对读操作和写操作进行锁定和解锁操作。读写锁遵循的访问控制规则与互斥锁有所不同。在读写锁管辖的范围内,它允许任意多个读操作的同时进行;但是在同一时刻,它只允许有一个写操作在进行。并且,在某一个写操作被进行的过程中,读操作的进行也是不被允许的。也就是说,读写锁控制下的多个写操作之间都是互斥的,并且写操作与读操作之间也都是互斥的。但是,多个读操作之间却不存在互斥关系。它的优势在于可以多个协程同时读数据,这样在高并发并且读多写少的场景下,可以大大提高我们的访问性能。
sync.RWMutex 读写锁使用的最佳实践
channel 作为 Go 并发模型的核心思想:不要通过共享内存来通信,而应该通过通信来共享内存,那么在 Go 里面,当然也可以很方便通过 channel 来实现协程的并发和同步了,并且 channel 本身还可以支持有缓冲和无缓冲的,通过 channel + timeout 实现并发协程之间的同步也是常见的一种使用姿势。
具体的可以参考我的《Golang Channel 详细原理和使用技巧》 一文中的详细说明。
虽然 context 可以通过父子关系来控制子协程的生命周期,但是,如果我们的并发协程之间要等待其他协程执行完毕,那么 context 就无法做到了,这个时候就需要 sync.WaitGroup 同步了。
WaitGroup 用来实现 go 协程之间的同步,用来保证多个协程同步执行并等待所有协程执行结束,通过 WaitGroup 机制就可以不用使用 sleep 一个固定时间来进行等待了。
WaitGroup 内部有一个计数器,最初从0开始计数,它总共对我们提供了三个方法:Add(delta int), Done(), Wait()。
Go 里面可以很方便通过 channel 实现协程的并发和同步,因为 channel 本身可以支持有缓冲和无缓冲的,然后可以实现同步和阻塞,通过 channel + timeout 实现并发协程之间的同步也是常见的一种使用姿势。
那么 WaitGroup 和 channel 相比,他们有啥区别呢,如下:
同时,sync.WaitGroup 和 Channel 还可以可以结合使用,具体点击这里查看。
假如,我们需要创建多个 goroutine 去并发执行业务逻辑,然后一定要等这些并发全部完成后才继续接下来的程序逻辑执行,这个是一个非常场景的业务场景,大多数业务场景都需要这样。一个简单示例如下:
package mainimport ( "fmt" "sync")var waitgroup sync.WaitGroupfunc Afunction(shownum int) { fmt.Println(shownum) waitgroup.Done() //任务完成,将任务队列中的任务数量-1,其实.Done就是.Add(-1)}func main() { for i := 0; i < 10; i++ { waitgroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1, 一定要在 go 之外调用 go Afunction(i) } waitgroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞}
WaitGroup 可以实现并发同步的控制,最大优点是可以通过 Wait() 阻塞直到并发协程中的所有任务都执行完才解除阻塞,但是依然有它的一些问题,包括:
因为 WaitGroup 无法帮助我们返回所需要的错误信息,或者并发协程中只要一个 goroutine 出错我们就不再等其他 goroutine 了,因此,在实际项目应用中,还是会有一些不便,因此 errgroup 就做了这个事情。
官方原生的 errgroup 包 就是对 sync.waitGroup 的进一步封装,并且同时通过 Context 来控制超时,然后把 error 相关的处理逻辑给我们实现好了,使用 Go() 函数返回的第一个错误来停止所有协程,使用 errgroup 的时候尤其需要小心踩坑闭包问题。具体的使用示例,可以直接查看 官方原生的 errgroup 包 中的例子。
errgroup 包提供的 Group 结构体如下:
type Group struct { // context 的 cancel 方法 cancel func() // 复用 WaitGroup,可以同步等待多个并发的协程执行完毕 wg sync.WaitGroup // 单例模式,用来保证只会接受一次错误 errOnce sync.Once // 保存第一个返回的错误 err error}
需要注意的是, errgroup 这里有两个问题:
由于原生的 errgroup 有上面两个问题,因此很多公司内部,或者社区,都在原生的基础上做了一些扩展封装,用来解决原生 errgroup 的问题,比如 kratos 的 errgroup,在原生的 errgroup 之上,做了进一步的扩展,改进了没有 recover 问题和并发数量问题,err 中包含了panic 发生时的详细堆栈信息,使得 errgroup 机制更加健壮。