浅析Go语言中的sync包

发表时间: 2020-09-19 00:11

Golang sync包提供了一些基础的异步操作方法,非常值得学习,这里对sync包几个重要的结构体和方法做个介绍。

sync包

sync包是 golang 一个官方的异步库,提供了一些各种基础的异步的实现,如互斥锁等。sync 包主要包括了以下几种类型:

  • sync.Mutex 和 sync.WaitGroup
  • sync.Once
  • sync.Map
  • sync.Pool
  • sync.Cond

sync.Once

sync.Once 是 Golang package 中使方法只执行一次的对象实现,作用与 init 函数类似。 sync.Once比较多用在初始化,注册和对象创建上。sync.Once 源码:

type Once struct {	// done 表示被执行标识	// 同时因为 done 是高频放在结构体的第一个位置,	// 可以通过结构体指针直接进行访问,而访问其他的	// 字段需要通过偏移量计算相对就会慢一些	done uint32	m    Mutex}// Once结构体Do方法只会被执行一次// ,执行一次后 done 被设置为 1, 后面不再执行func (o *Once) Do(f func()) {	if atomic.LoadUint32(&o.done) == 0 {		// 加锁执行,这里没办法用		//	if atomic.CompareAndSwapUint32(&o.done, 0, 1) {		//		f()		//	}		// 这种形式,这样做等于在f()执行之前已经赋值,如果这时候有两个		// 同时进行调用,一个会执行f(),另一个则会在f()执行结束之前就返回		// 这样会存在问题,因此doSlow加了锁,同时在f()执行之后,再将done存储为1		o.doSlow(f)	}}func (o *Once) doSlow(f func()) {	o.m.Lock()	defer o.m.Unlock()	// 这里是做了 double check	if o.done == 0 {		// 在func执行之后再将done值改变		defer atomic.StoreUint32(&o.done, 1)		f()	}}

sync.Once用法

sync.Once 其中一个应用就是实现一个单例模式,比如在一个结构体中申明,并在初始化的时候使用:

var capInstance struct {	once         sync.Once	lock         sync.Mutex	capabilities *Capabilities}// Initialize the capability set.  This can only be done once per binary, subsequent calls are ignored.func Initialize(c Capabilities) {	// Only do this once	capInstance.once.Do(func() {		capInstance.capabilities = &c	})}


sync.Map

sync.Map是一个线程安全的map结构,一般用于多读少写的并发操作,下图是sync.Map数据结构


图引至码农桃花源公众号


type Map struct {	mu Mutex	read atomic.Value // readOnly	dirty map[interface{}]*entry	misses int}

muMap的互斥锁用于对并发操作进行加锁保护,read是用于存储只读内容的,可以提供高并发的读操作。 dirty是一个原始的map结构体,对dirty的操作需要加锁,dirty包涵了全量的数据,在读数据的时候会先读取readread读取不到再读dirtymissesread读取失败的次数,当多次读取失败后 misses 累计特定值,dirty就会升级成readsync.Map 这里采用的策略类似数据库常用的”读写分离”,技术都是相通的O(∩_∩)O

sync.Map用法

func main() {	var value sync.Map	// 写入	value.Store("your name", "shi")	value.Store("her name", "kanon")	// 读取	name, ok := value.Load("your name")	if !ok {		println("can't find name")	}	fmt.Println(name)	// 遍历	value.Range(func(ki, vi interface{}) bool {		k, v := ki.(string), vi.(string)		fmt.Println(k, v)		return true	})	// 删除	value.Delete("your name")	// 读取,如果不存在则写入	activename, loaded := value.LoadOrStore("his name", "baba")	fmt.Println(activename.(string), loaded)}

sync.Pool

sync.Pool 是一个用来缓存大量重复对象,减少大量对象创建给GC压力,是sync异步包中很重要的一种数据结构,看其基本数据结构:

type Pool struct {  // noCopy 表示不支持值拷贝,如果出现值拷贝用 go vet 编译检查的时候会报错	noCopy noCopy  // [P]poolLocal,表示每个local的P池	local     unsafe.Pointer  // local的长度	localSize uintptr  // 也是[P]poolLocal,表示上一个生命周期的local	victim     unsafe.Pointer  // victim的长度	victimSize uintptr  // 用于创建新对象方法,get获取不到就会调用创建一个新对象,一般由用户传入	New func() interface{}}


图引至码农桃花源公众号

sync.Pool 的用法

sync.Pool的用法很简单,就三个方法:

//初始化pool对象var pool sync.Pooltype shikanon struct {	num int}// 创建新对象创建方法func initPool() {	pool = sync.Pool{		New: func() interface{} {			return &shikanon{num: rand.Int()}		},	}}func main() {  initPool()  // 从pool对象池中取对象	p1 := pool.Get().(*shikanon)	fmt.Println("p1", p1.num)  // 将对象放入pool对象池	pool.Put(p1)	p2 := pool.Get().(*shikanon)  fmt.Println("p2", p2.num)}


sync.Cond

sync.Cond 是用于条件变量(condition variable)实现 —— 它可以让 Goroutine 都在满足特定条件时被唤醒,因此通常和锁一起使用,比如 Mutex 或 RWMutex。Cond 就是 condition 的意思。
sync.Cond 的数据结构:

type Cond struct {	// noCopy 保证结构体不在编译期间被拷贝,如果出现值拷贝用 go vet 编译检查的时候会报错	noCopy noCopy	// 等待条件的锁	L Locker	// 通知列表	notify  notifyList	// 用于禁止运行期间发生的拷贝	checker copyChecker}

从数据结构可以看出,sync.Cond 等于在sync.Mutext的基础上,增加了一个通知列表notify做条件通知。


sync.Cond 主要有三种方法:等待通知(wait),单发通知(signal),广播通知(broadcast)。

// 生成一个cond,需要传入一个Locker,// 因为阻塞等待通知的操作以及通知解除阻塞的操作就是基于Locker来实现的。func NewCond(l Locker) *Cond {	return &Cond{L: l}}// 用于等待通知func (c *Cond) Wait() {	// 检查cond是否被拷贝	c.checker.check()	// 将获得锁的goroutine加入等待队列	t := runtime_notifyListAdd(&c.notify)	c.L.Unlock()	// 将当前 Goroutine 追加到notifyList链表的末端,并让其处于休眠状态,这个操作是阻塞的,	// 让当前 goroutine 休眠主要是通过调用 runtime.goparkunlock 实现	runtime_notifyListWait(&c.notify, t)	c.L.Lock()}// 用于发送单个通知func (c *Cond) Signal() {	c.checker.check()	runtime_notifyListNotifyOne(&c.notify)}// 用于广播func (c *Cond) Broadcast() {	c.checker.check()	runtime_notifyListNotifyAll(&c.notify)}



runtime_notifyListNotifyOne

runtime_notifyListNotifyAll
runtime_notifyListAddruntime_notifyListWait
这几个函数都是用的runtime下的sema.go文件link过来的,这里先不深究,看看notifyList数据结构:

type notifyList struct {	// wait is the ticket number of the next waiter. It is atomically	// incremented outside the lock.	wait uint32	// notify is the ticket number of the next waiter to be notified. It can	// be read outside the lock, but is only written to with lock held.	//	// Both wait & notify can wrap around, and such cases will be correctly	// handled as long as their "unwrapped" difference is bounded by 2^31.	// For this not to be the case, we'd need to have 2^31+ goroutines	// blocked on the same condvar, which is currently not possible.	notify uint32	// List of parked waiters.	lock mutex	head *sudog	tail *sudog}

sync.Cond 的用法

sync.Cond 主要用于消息广播中(主要是单通知大家用的更多地是 channel + select )。比较经典的sync.Cond实现有etcd的 FIFO Scheduler 的实现:

func NewFIFOScheduler() Scheduler {	f := &fifo{		resume: make(chan struct{}, 1),		donec:  make(chan struct{}, 1),	}	// 生成一个Cond对象	f.finishCond = sync.NewCond(&f.mu)	f.ctx, f.cancel = context.WithCancel(context.Background())	go f.run()	return f}// WaitFinish 用于 等待至少 n 个任务被完成 或所有 pending任务被完成func (f *fifo) WaitFinish(n int) {	f.finishCond.L.Lock()	for f.finished < n || len(f.pendings) != 0 {		// 等待通知		f.finishCond.Wait()	}	f.finishCond.L.Unlock()}func (f *fifo) run() {	...	for {		var todo Job		...		// 完成一个上下文		todo(f.ctx)		// 加锁		f.finishCond.L.Lock()		f.finished++		f.pendings = f.pendings[1:]		// 广播通知唤醒其他所有goroutine		f.finishCond.Broadcast()		f.finishCond.L.Unlock()	}}


参考文献

  • https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives
  • https://www.shikanon.com/2019/%E7%BC%96%E7%A8%8B%E8%AF%AD%E8%A8%80/golang%E7%9A%84sync%E5%8C%85%E7%B3%BB%E5%88%97%E8%AE%B2%E8%A7%A3-1-sync-Map/
  • https://www.shikanon.com/2019/%E7%BC%96%E7%A8%8B%E8%AF%AD%E8%A8%80/%E6%B5%85%E8%B0%88golang%E7%9A%84sync%E5%8C%85/