Golang sync包提供了一些基础的异步操作方法,非常值得学习,这里对sync包几个重要的结构体和方法做个介绍。
sync包是 golang 一个官方的异步库,提供了一些各种基础的异步的实现,如互斥锁等。sync 包主要包括了以下几种类型:
sync.Once 是 Golang package 中使方法只执行一次的对象实现,作用与 init 函数类似。 sync.Once比较多用在初始化,注册和对象创建上。sync.Once 源码:
type Once struct { // done 表示被执行标识 // 同时因为 done 是高频放在结构体的第一个位置, // 可以通过结构体指针直接进行访问,而访问其他的 // 字段需要通过偏移量计算相对就会慢一些 done uint32 m Mutex}// Once结构体Do方法只会被执行一次// ,执行一次后 done 被设置为 1, 后面不再执行func (o *Once) Do(f func()) { if atomic.LoadUint32(&o.done) == 0 { // 加锁执行,这里没办法用 // if atomic.CompareAndSwapUint32(&o.done, 0, 1) { // f() // } // 这种形式,这样做等于在f()执行之前已经赋值,如果这时候有两个 // 同时进行调用,一个会执行f(),另一个则会在f()执行结束之前就返回 // 这样会存在问题,因此doSlow加了锁,同时在f()执行之后,再将done存储为1 o.doSlow(f) }}func (o *Once) doSlow(f func()) { o.m.Lock() defer o.m.Unlock() // 这里是做了 double check if o.done == 0 { // 在func执行之后再将done值改变 defer atomic.StoreUint32(&o.done, 1) f() }}
sync.Once 其中一个应用就是实现一个单例模式,比如在一个结构体中申明,并在初始化的时候使用:
var capInstance struct { once sync.Once lock sync.Mutex capabilities *Capabilities}// Initialize the capability set. This can only be done once per binary, subsequent calls are ignored.func Initialize(c Capabilities) { // Only do this once capInstance.once.Do(func() { capInstance.capabilities = &c })}
sync.Map是一个线程安全的map结构,一般用于多读少写的并发操作,下图是sync.Map的数据结构
type Map struct { mu Mutex read atomic.Value // readOnly dirty map[interface{}]*entry misses int}
mu是Map的互斥锁用于对并发操作进行加锁保护,read是用于存储只读内容的,可以提供高并发的读操作。 dirty是一个原始的map结构体,对dirty的操作需要加锁,dirty包涵了全量的数据,在读数据的时候会先读取read,read读取不到再读dirty。 misses 是read读取失败的次数,当多次读取失败后 misses 累计特定值,dirty就会升级成read。sync.Map 这里采用的策略类似数据库常用的”读写分离”,技术都是相通的O(∩_∩)O
func main() { var value sync.Map // 写入 value.Store("your name", "shi") value.Store("her name", "kanon") // 读取 name, ok := value.Load("your name") if !ok { println("can't find name") } fmt.Println(name) // 遍历 value.Range(func(ki, vi interface{}) bool { k, v := ki.(string), vi.(string) fmt.Println(k, v) return true }) // 删除 value.Delete("your name") // 读取,如果不存在则写入 activename, loaded := value.LoadOrStore("his name", "baba") fmt.Println(activename.(string), loaded)}
sync.Pool 是一个用来缓存大量重复对象,减少大量对象创建给GC压力,是sync异步包中很重要的一种数据结构,看其基本数据结构:
type Pool struct { // noCopy 表示不支持值拷贝,如果出现值拷贝用 go vet 编译检查的时候会报错 noCopy noCopy // [P]poolLocal,表示每个local的P池 local unsafe.Pointer // local的长度 localSize uintptr // 也是[P]poolLocal,表示上一个生命周期的local victim unsafe.Pointer // victim的长度 victimSize uintptr // 用于创建新对象方法,get获取不到就会调用创建一个新对象,一般由用户传入 New func() interface{}}
sync.Pool的用法很简单,就三个方法:
//初始化pool对象var pool sync.Pooltype shikanon struct { num int}// 创建新对象创建方法func initPool() { pool = sync.Pool{ New: func() interface{} { return &shikanon{num: rand.Int()} }, }}func main() { initPool() // 从pool对象池中取对象 p1 := pool.Get().(*shikanon) fmt.Println("p1", p1.num) // 将对象放入pool对象池 pool.Put(p1) p2 := pool.Get().(*shikanon) fmt.Println("p2", p2.num)}
sync.Cond 是用于条件变量(condition variable)实现 —— 它可以让 Goroutine 都在满足特定条件时被唤醒,因此通常和锁一起使用,比如 Mutex 或 RWMutex。Cond 就是 condition 的意思。
sync.Cond 的数据结构:
type Cond struct { // noCopy 保证结构体不在编译期间被拷贝,如果出现值拷贝用 go vet 编译检查的时候会报错 noCopy noCopy // 等待条件的锁 L Locker // 通知列表 notify notifyList // 用于禁止运行期间发生的拷贝 checker copyChecker}
从数据结构可以看出,sync.Cond 等于在sync.Mutext的基础上,增加了一个通知列表notify做条件通知。
sync.Cond 主要有三种方法:等待通知(wait),单发通知(signal),广播通知(broadcast)。
// 生成一个cond,需要传入一个Locker,// 因为阻塞等待通知的操作以及通知解除阻塞的操作就是基于Locker来实现的。func NewCond(l Locker) *Cond { return &Cond{L: l}}// 用于等待通知func (c *Cond) Wait() { // 检查cond是否被拷贝 c.checker.check() // 将获得锁的goroutine加入等待队列 t := runtime_notifyListAdd(&c.notify) c.L.Unlock() // 将当前 Goroutine 追加到notifyList链表的末端,并让其处于休眠状态,这个操作是阻塞的, // 让当前 goroutine 休眠主要是通过调用 runtime.goparkunlock 实现 runtime_notifyListWait(&c.notify, t) c.L.Lock()}// 用于发送单个通知func (c *Cond) Signal() { c.checker.check() runtime_notifyListNotifyOne(&c.notify)}// 用于广播func (c *Cond) Broadcast() { c.checker.check() runtime_notifyListNotifyAll(&c.notify)}
runtime_notifyListNotifyOne、
runtime_notifyListNotifyAll、runtime_notifyListAdd、runtime_notifyListWait
这几个函数都是用的runtime下的sema.go文件link过来的,这里先不深究,看看notifyList数据结构:
type notifyList struct { // wait is the ticket number of the next waiter. It is atomically // incremented outside the lock. wait uint32 // notify is the ticket number of the next waiter to be notified. It can // be read outside the lock, but is only written to with lock held. // // Both wait & notify can wrap around, and such cases will be correctly // handled as long as their "unwrapped" difference is bounded by 2^31. // For this not to be the case, we'd need to have 2^31+ goroutines // blocked on the same condvar, which is currently not possible. notify uint32 // List of parked waiters. lock mutex head *sudog tail *sudog}
sync.Cond 主要用于消息广播中(主要是单通知大家用的更多地是 channel + select )。比较经典的sync.Cond实现有etcd的 FIFO Scheduler 的实现:
func NewFIFOScheduler() Scheduler { f := &fifo{ resume: make(chan struct{}, 1), donec: make(chan struct{}, 1), } // 生成一个Cond对象 f.finishCond = sync.NewCond(&f.mu) f.ctx, f.cancel = context.WithCancel(context.Background()) go f.run() return f}// WaitFinish 用于 等待至少 n 个任务被完成 或所有 pending任务被完成func (f *fifo) WaitFinish(n int) { f.finishCond.L.Lock() for f.finished < n || len(f.pendings) != 0 { // 等待通知 f.finishCond.Wait() } f.finishCond.L.Unlock()}func (f *fifo) run() { ... for { var todo Job ... // 完成一个上下文 todo(f.ctx) // 加锁 f.finishCond.L.Lock() f.finished++ f.pendings = f.pendings[1:] // 广播通知唤醒其他所有goroutine f.finishCond.Broadcast() f.finishCond.L.Unlock() }}